Merge remote-tracking branch 'origin/3.0' into fix/valgrind

This commit is contained in:
Shengliang Guan 2022-07-07 17:04:39 +08:00
commit bc7157bd69
40 changed files with 625 additions and 396 deletions

View File

@ -241,7 +241,7 @@ int32_t create_topic() {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -302,7 +302,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "false"); /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);

View File

@ -32,6 +32,18 @@ enum {
TMQ_CONF__RESET_OFFSET__LATEST = -1, TMQ_CONF__RESET_OFFSET__LATEST = -1,
}; };
// clang-format off
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_RSP,

View File

@ -2826,8 +2826,8 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); // tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); // tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType); tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen); tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen); tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
} }
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); // buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); // buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);

View File

@ -30,15 +30,15 @@ struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef struct SReadHandle { typedef struct SReadHandle {
void* reader; void* streamReader;
void* meta; void* meta;
void* config; void* config;
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
bool initMetaReader;
// int8_t initTsdbReader; bool initTableReader;
bool tqReader; bool initStreamReader;
} SReadHandle; } SReadHandle;
typedef enum { typedef enum {

View File

@ -223,7 +223,7 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int8_t isDataScan; int8_t isDataScan;
@ -235,6 +235,11 @@ struct SStreamTask {
int8_t taskStatus; int8_t taskStatus;
int8_t execStatus; int8_t execStatus;
// exec info
int64_t enqueueVer;
int64_t processedVer;
int64_t checkpointVer;
// node info // node info
int32_t selfChildId; int32_t selfChildId;
int32_t nodeId; int32_t nodeId;
@ -277,7 +282,7 @@ struct SStreamTask {
// msg handle // msg handle
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
}; } SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) { while (1) {
int8_t inputStatus = int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING); atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
} }
ASSERT(0); ASSERT(0);
} }
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE); atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
} }
#if 0
// TODO: back pressure // TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0; return 0;
} }

View File

@ -88,7 +88,7 @@ typedef struct {
EWalType level; // wal level EWalType level; // wal level
} SWalCfg; } SWalCfg;
typedef struct SWalVer { typedef struct {
int64_t firstVer; int64_t firstVer;
int64_t verInSnapshotting; int64_t verInSnapshotting;
int64_t snapshotVer; int64_t snapshotVer;
@ -149,17 +149,22 @@ typedef struct SWal {
SWalCkHead writeHead; SWalCkHead writeHead;
} SWal; // WAL HANDLE } SWal; // WAL HANDLE
typedef struct SWalReadHandle { typedef struct {
int8_t scanUncommited;
int8_t scanMeta;
} SWalFilterCond;
typedef struct {
SWal *pWal; SWal *pWal;
TdFilePtr pReadLogTFile; TdFilePtr pLogFile;
TdFilePtr pReadIdxTFile; TdFilePtr pIdxFile;
int64_t curFileFirstVer; int64_t curFileFirstVer;
int64_t curVersion; int64_t curVersion;
int64_t capacity; int64_t capacity;
int64_t status; // if cursor valid
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead; SWalCkHead *pHead;
} SWalReadHandle; } SWalReader;
// module initialization // module initialization
int32_t walInit(); int32_t walInit();
@ -178,7 +183,6 @@ void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver); int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver); int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely // notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver); int32_t walBeginSnapshot(SWal *, int64_t ver);
@ -187,15 +191,16 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*); // int32_t walDataCorrupted(SWal*);
// read // read
SWalReadHandle *walOpenReadHandle(SWal *); SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReadHandle(SWalReadHandle *); void walCloseReader(SWalReader *pRead);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity); void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct { typedef struct {
int64_t refId; int64_t refId;
@ -207,10 +212,11 @@ void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *); int32_t walUnrefVer(SWal *);
// help function for raft
bool walLogExist(SWal *, int64_t ver); bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *);
// lifecycle check // lifecycle check
bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *); int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *); int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *); int64_t walGetLastVer(SWal *);

View File

@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
char *strDupUnquo(const char *src);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context; T_MD5_CTX context;

View File

@ -62,7 +62,6 @@ struct tmq_conf_t {
char* ip; char* ip;
char* user; char* user;
char* pass; char* pass;
/*char* db;*/
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
}; };
@ -338,7 +337,7 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* list, const char* src) { int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container; SArray* container = &list->container;
char* topic = strdup(src); char* topic = strDupUnquo(src);
if (taosArrayPush(container, &topic) == NULL) return -1; if (taosArrayPush(container, &topic) == NULL) return -1;
return 0; return 0;
} }

View File

@ -40,15 +40,6 @@ extern "C" {
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on // clang-format on
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
@ -128,7 +119,7 @@ typedef struct {
int8_t fetchMeta; int8_t fetchMeta;
// reader // reader
SWalReadHandle* pWalReader; SWalReader* pWalReader;
// push // push
STqPushHandle pushHandle; STqPushHandle pushHandle;

View File

@ -439,6 +439,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
tDecoderInit(&dc, pData, nData); tDecoderInit(&dc, pData, nData);
tDecodeSSchemaWrapper(&dc, pSchemaWrapper); tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData);
// convert // convert
STSchemaBuilder sb = {0}; STSchemaBuilder sb = {0};

View File

@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
} }
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .streamReader = pReadHandle,
.meta = pMeta, .meta = pMeta,
.pMsgCb = pMsgCb, .pMsgCb = pMsgCb,
.vnode = pVnode, .vnode = pVnode,

View File

@ -28,8 +28,12 @@ int32_t tqInit() {
atomic_store_8(&tqMgmt.inited, 0); atomic_store_8(&tqMgmt.inited, 0);
return -1; return -1;
} }
if (streamInit() < 0) {
return -1;
}
atomic_store_8(&tqMgmt.inited, 1); atomic_store_8(&tqMgmt.inited, 1);
} }
return 0; return 0;
} }
@ -42,6 +46,7 @@ void tqCleanUp() {
if (old == 1) { if (old == 1) {
taosTmrCleanUp(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer);
streamCleanUp();
atomic_store_8(&tqMgmt.inited, 0); atomic_store_8(&tqMgmt.inited, 0);
} }
} }
@ -144,7 +149,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp); tEncodeSMqDataRsp(&encoder, pRsp);
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(IS_META_MSG(pHead->msgType)); ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version; /*metaRsp.reqOffset = pReq->reqOffset.version;*/
metaRsp.rspOffset = fetchVer; /*metaRsp.rspOffset = fetchVer;*/
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body; metaRsp.metaRsp = pHead->body;
@ -439,7 +446,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType; pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta; pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
} }
@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL; req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = { SReadHandle handle = {
.reader = pHandle->execHandle.pExecReader[i], .streamReader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.tqReader = true, .initTableReader = true,
}; };
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
@ -522,19 +529,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
// expand runners // expand runners
if (pTask->isDataScan) { if (pTask->isDataScan) {
SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initStreamReader = 1,
}; };
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else { } else {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
} }
ASSERT(pTask->exec.executor);
} }
// sink // sink

View File

@ -77,14 +77,14 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle; STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
} }
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = { SReadHandle reader = {
.reader = handle.execHandle.pExecReader[i], .streamReader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,

View File

@ -309,9 +309,11 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
if (code) goto _err; if (code) goto _err;
// code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx); // code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
pDelIdx = taosArraySearch(pDelIdxArray, &idx, tCmprDelIdx, TD_EQ); SDelIdx *pIdx = taosArraySearch(pDelIdxArray, &idx, tCmprDelIdx, TD_EQ);
if (code) goto _err; if (code) goto _err;
*pDelIdx = *pIdx;
if (pDelIdxArray) { if (pDelIdxArray) {
taosArrayDestroy(pDelIdxArray); taosArrayDestroy(pDelIdxArray);
} }

View File

@ -168,8 +168,6 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
taosArrayClear(pCommitter->aDelData);
if (pTbData) { if (pTbData) {
suid = pTbData->suid; suid = pTbData->suid;
uid = pTbData->uid; uid = pTbData->uid;
@ -185,6 +183,8 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL); code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
if (code) goto _err; if (code) goto _err;
} else {
taosArrayClear(pCommitter->aDelData);
} }
if (pTbData == NULL && pDelIdx == NULL) goto _exit; if (pTbData == NULL && pDelIdx == NULL) goto _exit;
@ -205,7 +205,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
if (code) goto _err; if (code) goto _err;
// put delIdx // put delIdx
if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) { if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }

View File

@ -520,7 +520,7 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb can and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -105,7 +105,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
// check if table exists (todo: refact) // check if table exists (todo: refact)
SMetaReader mr = {0}; SMetaReader mr = {0};
SMetaEntry me = {0}; // SMetaEntry me = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) { if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
metaReaderClear(&mr); metaReaderClear(&mr);
@ -117,6 +117,8 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
if (mr.me.type == TSDB_NORMAL_TABLE) { if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schemaRow.version; sverNew = mr.me.ntbEntry.schemaRow.version;
} else { } else {
tDecoderClear(&mr.coder);
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
sverNew = mr.me.stbEntry.schemaRow.version; sverNew = mr.me.stbEntry.schemaRow.version;
} }

View File

@ -49,7 +49,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
} }
pDelFWriter->fDel.size = TSDB_FHDR_SIZE; pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
pDelFWriter->fDel.size = 0; pDelFWriter->fDel.offset = 0;
*ppWriter = pDelFWriter; *ppWriter = pDelFWriter;
return code; return code;

View File

@ -348,7 +348,7 @@ typedef struct SessionWindowSupporter {
uint8_t parentType; uint8_t parentType;
} SessionWindowSupporter; } SessionWindowSupporter;
typedef struct SStreamBlockScanInfo { typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
@ -365,7 +365,7 @@ typedef struct SStreamBlockScanInfo {
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle void* streamReader;// stream block reader handle
int32_t tsArrayIndex; int32_t tsArrayIndex;
SArray* tsArray; SArray* tsArray;
@ -374,7 +374,7 @@ typedef struct SStreamBlockScanInfo {
EStreamScanMode scanMode; EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp; SOperatorInfo* pStreamScanOp;
SOperatorInfo* pSnapshotReadOp; SOperatorInfo* pTableScanOp;
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
@ -383,7 +383,7 @@ typedef struct SStreamBlockScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex; int32_t deleteDataIndex;
} SStreamBlockScanInfo; } SStreamScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp; SRetrieveMetaTableRsp* pRsp;
@ -518,6 +518,7 @@ typedef struct SIndefOperatorInfo {
SAggSupporter aggSup; SAggSupporter aggSup;
SArray* pPseudoColInfo; SArray* pPseudoColInfo;
SExprSupp scalarSup; SExprSupp scalarSup;
SNode* pCondition;
} SIndefOperatorInfo; } SIndefOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
@ -528,6 +529,7 @@ typedef struct SFillOperatorInfo {
SSDataBlock* existNewGroupBlock; SSDataBlock* existNewGroupBlock;
bool multigroupResult; bool multigroupResult;
STimeWindow win; STimeWindow win;
SNode* pCondition;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
@ -588,6 +590,7 @@ typedef struct SSessionAggOperatorInfo {
int64_t gap; // session window gap int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SNode *pCondition;
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SResultWindowInfo { typedef struct SResultWindowInfo {
@ -649,6 +652,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t tsSlotId; // primary timestamp column slot id int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
// bool reptScan; // bool reptScan;
const SNode *pCondition;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo { typedef struct SStreamStateAggOperatorInfo {
@ -806,7 +810,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo); SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
@ -820,7 +824,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);

View File

@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else { } else {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid; pInfo->assignBlockUid = assignUid;
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = type; pInfo->blockType = type;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo; return pTaskInfo;
} }
static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) { static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table. // let's discard the tables those are not created according to the queried super table.
@ -169,16 +169,16 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
int32_t code = 0; int32_t code = 0;
SStreamBlockScanInfo* pScanInfo = pInfo->info; SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id if (isAdd) { // add new table id
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
taosArrayDestroy(qa); taosArrayDestroy(qa);
} else { // remove the table id in current list } else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
code = tqReadHandleRemoveTbUidList(pScanInfo->streamBlockReader, tableIdList); code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
} }
return code; return code;

View File

@ -2849,12 +2849,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info; SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
pScanInfo->pSnapshotReadOp->status = OP_OPENED; pScanInfo->pTableScanOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER); ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) { if (uid == 0) {
@ -2914,8 +2914,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info; SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
*uid = pSnapShotScanInfo->lastStatus.uid; *uid = pSnapShotScanInfo->lastStatus.uid;
*ts = pSnapShotScanInfo->lastStatus.ts; *ts = pSnapShotScanInfo->lastStatus.ts;
} else { } else {
@ -3364,7 +3364,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
} }
} }
static SSDataBlock* doFill(SOperatorInfo* pOperator) { static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SFillOperatorInfo* pInfo = pOperator->info; SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3372,9 +3372,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pInfo->pRes; SSDataBlock* pResBlock = pInfo->pRes;
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
// todo handle different group data interpolation // todo handle different group data interpolation
bool n = false; bool n = false;
@ -3440,6 +3437,39 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
} }
} }
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSDataBlock* fillResult = NULL;
while (true) {
fillResult = doFillImpl(pOperator);
if (fillResult != NULL) {
doFilter(pInfo->pCondition, fillResult);
}
if (fillResult == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (fillResult->info.rows > 0) {
break;
}
}
if (fillResult != NULL) {
size_t rows = fillResult->info.rows;
pOperator->resultInfo.totalRows += rows;
}
return fillResult;
}
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i]; SExprInfo* pExprInfo = &pExpr[i];
@ -3832,6 +3862,8 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
} }
} }
doFilter(pIndefInfo->pCondition, pInfo->pRes);
size_t rows = pInfo->pRes->info.rows; size_t rows = pInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
@ -3885,6 +3917,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pInfo->pCondition = pPhyNode->node.pConditions;
pOperator->name = "IndefinitOperator"; pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
@ -3958,6 +3991,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions;
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
@ -4455,7 +4489,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
@ -4477,7 +4511,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode); SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
@ -4587,9 +4621,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
} }
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo); return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pSnapshotReadOp->info; *ppInfo = pInfo->pTableScanOp->info;
return 0; return 0;
} }
} }

View File

@ -302,14 +302,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
size_t rows = pRes->info.rows; bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break;
} }
pOperator->resultInfo.totalRows += rows; if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes; return (pRes->info.rows == 0)? NULL:pRes;
} }

View File

@ -780,7 +780,7 @@ _error:
return NULL; return NULL;
} }
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
@ -791,21 +791,20 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
} }
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { static bool isSessionWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
} }
static bool isStateWindow(SStreamBlockScanInfo* pInfo) { static bool isStateWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
} }
static bool isIntervalWindow(SStreamBlockScanInfo* pInfo) { static bool isIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL; pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
} }
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupId) { if (groupId) {
@ -827,17 +826,15 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
*/ */
} }
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
ASSERT(rowIndex < pBlock->info.rows); ASSERT(rowIndex < pBlock->info.rows);
switch (pBlock->info.type) switch (pBlock->info.type) {
{
case STREAM_DELETE_DATA: case STREAM_DELETE_DATA:
case STREAM_RETRIEVE: { case STREAM_RETRIEVE: {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData; uint64_t* groupCol = (uint64_t*)pColInfo->pData;
pInfo->groupId = groupCol[rowIndex]; pInfo->groupId = groupCol[rowIndex];
} } break;
break;
default: default:
break; break;
} }
@ -854,7 +851,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
} }
static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) { static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
if ((*pRowIndex) == pBlock->info.rows) { if ((*pRowIndex) == pBlock->info.rows) {
return false; return false;
} }
@ -877,16 +874,16 @@ static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, i
win.skey = TMIN(win.skey, startData[*pRowIndex]); win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue; continue;
} }
ASSERT( (win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
( isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0) ) ); (isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
break; break;
} }
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win); resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true; return true;
} }
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = { STimeWindow win = {
.skey = INT64_MIN, .skey = INT64_MIN,
.ekey = INT64_MAX, .ekey = INT64_MAX,
@ -907,11 +904,10 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else { } else {
win = win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, (*pRowIndex) +=
TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
needRead = true; needRead = true;
} else if (isStateWindow(pInfo)) { } else if (isStateWindow(pInfo)) {
@ -929,7 +925,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
if (!needRead) { if (!needRead) {
return false; return false;
} }
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win); resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true; return true;
} }
@ -946,13 +942,13 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest->info.rows++; dest->info.rows++;
} }
static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) { if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
} }
if (!pResult) { if (!pResult) {
blockDataCleanup(pSDB); blockDataCleanup(pSDB);
@ -966,14 +962,14 @@ static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB,
} }
} }
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
if (pResult == NULL) { if (pResult == NULL) {
if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) { if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
} }
} }
if (!pResult) { if (!pResult) {
@ -997,8 +993,8 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
return pResult; return pResult;
*/ */
} }
static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) { SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) { if (pDelBlock->info.rows == 0) {
return; return;
} }
@ -1014,10 +1010,12 @@ static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlo
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows && for (int32_t i = pInfo->deleteDataIndex;
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) { i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1;
i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]); uint64_t groupId = getGroupId(pOperator, uidCol[i]);
for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) { for (TSKEY startTs = startData[i]; startTs <= endData[i];) {
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false); colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false); colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
pUpdateRes->info.rows++; pUpdateRes->info.rows++;
@ -1032,7 +1030,8 @@ static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlo
} }
} }
static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) { static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
if (pBlock->info.rows == 0) { if (pBlock->info.rows == 0) {
return; return;
} }
@ -1050,15 +1049,17 @@ static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock,
SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
int32_t dummy = 0; int32_t dummy = 0;
for (int32_t i = 0 ; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]); uint64_t groupId = getGroupId(pOperator, uidCol[i]);
//gap must be 0. // gap must be 0.
SResultWindowInfo* pStartWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy); SResultWindowInfo* pStartWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
if (!pStartWin) { if (!pStartWin) {
// window has been closed. // window has been closed.
continue; continue;
} }
SResultWindowInfo* pEndWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy); SResultWindowInfo* pEndWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
ASSERT(pEndWin); ASSERT(pEndWin);
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false); colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false); colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
@ -1066,7 +1067,7 @@ static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock,
pUpdateRes->info.rows++; pUpdateRes->info.rows++;
} }
} }
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock); blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray); int32_t size = taosArrayGetSize(pInfo->tsArray);
if (pInfo->tsArrayIndex < size) { if (pInfo->tsArrayIndex < size) {
@ -1075,11 +1076,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
blockDataEnsureCapacity(pUpdateBlock, size); blockDataEnsureCapacity(pUpdateBlock, size);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
int32_t i = 0; int32_t i = 0;
for (; i < size; i++) { for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
if (pInfo->groupId != id) { if (pInfo->groupId != id) {
break; break;
} }
@ -1098,12 +1099,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
} }
if (size == 0) { if (size == 0) {
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock); generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
} }
} }
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
bool out) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData; TSKEY* ts = (TSKEY*)pColDataInfo->pData;
@ -1119,15 +1119,15 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
uint64_t* uidCol = (uint64_t*)pColDataInfo->pData; uint64_t* uidCol = (uint64_t*)pColDataInfo->pData;
ASSERT(pBlock->info.rows > 0); ASSERT(pBlock->info.rows > 0);
for (int32_t i = 0 ; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
uidCol[i] = getGroupId(pOperator, uidCol[i]); uidCol[i] = getGroupId(pOperator, uidCol[i]);
} }
} }
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
@ -1145,34 +1145,33 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
// TODO move into scan
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) { switch (pBlock->info.type) {
case STREAM_RETRIEVE:{ case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock); copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0; pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex); prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} } break;
break;
case STREAM_DELETE_DATA: { case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
if (isIntervalWindow(pInfo)) { if (isIntervalWindow(pInfo)) {
copyDataBlock(pInfo->pDeleteDataRes, pBlock); copyDataBlock(pInfo->pDeleteDataRes, pBlock);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes); generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else { } else {
generateScanRange(pInfo, pBlock, pInfo->pSnapshotReadOp, pInfo->pUpdateRes); generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
} }
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} } break;
break;
default: default:
break; break;
} }
@ -1233,11 +1232,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->streamBlockReader)) { while (tqNextDataBlock(pInfo->streamReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
// todo refactor // todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader); int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader);
uint64_t groupId = block.info.groupId; uint64_t groupId = block.info.groupId;
uint64_t uid = block.info.uid; uint64_t uid = block.info.uid;
@ -1333,11 +1332,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
} }
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
// check reader last status // check reader last status
// if not match, reset status // if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL; return pResult && pResult->info.rows > 0 ? pResult : NULL;
} else { } else {
@ -1361,7 +1362,7 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
uint64_t taskId) { uint64_t taskId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -1400,13 +1401,25 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
if (pHandle) { if (pHandle) {
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->tqReader) { if (pHandle->initTableReader) {
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, 0); pSTInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) {
ASSERT(0);
}
}
if (pHandle->initStreamReader) {
ASSERT(pHandle->streamReader == NULL);
pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta);
ASSERT(pInfo->streamReader);
} else {
ASSERT(pHandle->streamReader);
pInfo->streamReader = pHandle->streamReader;
} }
if (pSTInfo->interval.interval > 0) { if (pSTInfo->interval.interval > 0) {
@ -1414,18 +1427,17 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} else { } else {
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
} }
pInfo->pSnapshotReadOp = pTableScanDummy;
pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pSTInfo->interval; pInfo->interval = pSTInfo->interval;
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
ASSERT(pHandle->reader);
pInfo->streamBlockReader = pHandle->reader;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds); tqReadHandleSetColIdList(pInfo->streamReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); int32_t code = tqReadHandleSetTbUidList(pInfo->streamReader, tableIdList);
if (code != 0) { if (code != 0) {
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
goto _error; goto _error;
@ -1449,7 +1461,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->deleteDataIndex = 0; pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock(); pInfo->pDeleteDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
@ -1458,7 +1470,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator; return pOperator;

View File

@ -1146,13 +1146,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); while(1) {
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; break;
} }
return pBInfo->pRes; if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
@ -1178,15 +1187,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); while(1) {
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break;
} }
size_t rows = pBInfo->pRes->info.rows; if (pBInfo->pRes->info.rows > 0) {
pOperator->resultInfo.totalRows += rows; break;
}
return (rows == 0) ? NULL : pBInfo->pRes; }
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
} }
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
@ -1880,12 +1896,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); while(1) {
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break;
} }
return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL; if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
@ -1914,15 +1940,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); while(1) {
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break;
} }
size_t rows = pBInfo->pRes->info.rows; if (pBInfo->pRes->info.rows > 0) {
pOperator->resultInfo.totalRows += rows; break;
}
return (rows == 0) ? NULL : pBInfo->pRes; }
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
} }
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
@ -2235,7 +2268,7 @@ _error:
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId,
SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo) { SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -2246,6 +2279,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
pInfo->pCondition = pCondition;
if (pInfo->stateKey.pData == NULL) { if (pInfo->stateKey.pData == NULL) {
goto _error; goto _error;
} }
@ -2289,7 +2323,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { STimeWindowAggSupp* pTwAggSupp, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -2315,6 +2349,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN; pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->pCondition = pCondition;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true; pOperator->blocking = true;
@ -2988,7 +3023,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark, void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
uint8_t type) { uint8_t type) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamBlockScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type}; pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type};
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
} }
@ -3545,8 +3580,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
SArray* pClosed, __get_win_info_ fn, bool delete) { bool delete) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void** pIte = NULL; void** pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
@ -3708,8 +3743,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
getResWinForSession, pInfo->ignoreExpiredData); pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pStUpdated, pUpdated); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
@ -4210,8 +4245,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
getResWinForState, pInfo->ignoreExpiredData); pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pSeUpdated, pUpdated); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);

View File

@ -890,7 +890,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
return code; return code;
} }
static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { static int32_t pushDownCondOptTrivialPushDown(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
if (NULL == pLogicNode->pConditions || if (NULL == pLogicNode->pConditions ||
OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -921,7 +921,7 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
break; break;
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
case QUERY_NODE_LOGIC_PLAN_PARTITION: case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = pushDownCondOptDealLogicNode(pCxt, pLogicNode); code = pushDownCondOptTrivialPushDown(pCxt, pLogicNode);
break; break;
default: default:
break; break;

View File

@ -17,6 +17,7 @@
#define _STREAM_INC_H_ #define _STREAM_INC_H_
#include "executor.h" #include "executor.h"
#include "tref.h"
#include "tstream.h" #include "tstream.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -25,6 +26,7 @@ extern "C" {
typedef struct { typedef struct {
int8_t inited; int8_t inited;
int32_t refPool;
void* timer; void* timer;
} SStreamGlobalEnv; } SStreamGlobalEnv;

View File

@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
if (streamInit() < 0) {
return -1;
}
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
} }

View File

@ -33,7 +33,7 @@ typedef struct SSyncLogStoreData {
SWal* pWal; SWal* pWal;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalReadHandle* pWalHandle; SWalReader* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0 // SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData; } SSyncLogStoreData;

View File

@ -62,7 +62,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
ASSERT(pData->pWal != NULL); ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL); taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal); pData->pWalHandle = walOpenReader(pData->pWal, NULL);
ASSERT(pData->pWalHandle != NULL); ASSERT(pData->pWalHandle != NULL);
pLogStore->appendEntry = logStoreAppendEntry; pLogStore->appendEntry = logStoreAppendEntry;
@ -95,7 +95,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) { if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle); walCloseReader(pData->pWalHandle);
pData->pWalHandle = NULL; pData->pWalHandle = NULL;
} }
taosThreadMutexUnlock(&(pData->mutex)); taosThreadMutexUnlock(&(pData->mutex));
@ -255,7 +255,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL; *ppEntry = NULL;
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle; SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) { if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -263,7 +263,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
code = walReadWithHandle(pWalHandle, index); code = walReadVer(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
@ -398,10 +398,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle; SWalReader* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL); ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index); int32_t code = walReadVer(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);

View File

@ -19,6 +19,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "wal.h" #include "wal.h"

View File

@ -16,20 +16,29 @@
#include "taoserror.h" #include "taoserror.h"
#include "walInt.h" #include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle)); static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
if (pRead == NULL) { if (pRead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pRead->pWal = pWal; pRead->pWal = pWal;
pRead->pReadIdxTFile = NULL; pRead->pIdxFile = NULL;
pRead->pReadLogTFile = NULL; pRead->pLogFile = NULL;
pRead->curVersion = -1; pRead->curVersion = -1;
pRead->curFileFirstVer = -1; pRead->curFileFirstVer = -1;
pRead->capacity = 0; pRead->capacity = 0;
pRead->status = 0; if (cond)
pRead->cond = *cond;
else {
pRead->cond.scanMeta = 0;
pRead->cond.scanUncommited = 0;
}
taosThreadMutexInit(&pRead->mutex, NULL); taosThreadMutexInit(&pRead->mutex, NULL);
@ -39,26 +48,46 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
taosMemoryFree(pRead); taosMemoryFree(pRead);
return NULL; return NULL;
} }
return pRead; return pRead;
} }
void walCloseReadHandle(SWalReadHandle *pRead) { void walCloseReader(SWalReader *pRead) {
taosCloseFile(&pRead->pReadIdxTFile); taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pReadLogTFile); taosCloseFile(&pRead->pLogFile);
taosMemoryFreeClear(pRead->pHead); taosMemoryFreeClear(pRead->pHead);
taosMemoryFree(pRead); taosMemoryFree(pRead);
} }
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { int32_t walNextValidMsg(SWalReader *pRead) {
// TODO int64_t fetchVer = pRead->curVersion;
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
while (fetchVer <= endVer) {
if (walFetchHeadNew(pRead, fetchVer) < 0) {
return -1;
}
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
if (walFetchBodyNew(pRead) < 0) {
return -1;
}
return 0; return 0;
} else {
if (walSkipFetchBodyNew(pRead) < 0) {
return -1;
}
fetchVer++;
ASSERT(fetchVer == pRead->curVersion);
}
}
return -1;
} }
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0; int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile; TdFilePtr pIdxTFile = pRead->pIdxFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile; TdFilePtr pLogTFile = pRead->pLogFile;
// seek position // seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
@ -90,11 +119,11 @@ static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
return ret; return ret;
} }
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
taosCloseFile(&pRead->pReadIdxTFile); taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pReadLogTFile); taosCloseFile(&pRead->pLogFile);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
@ -104,7 +133,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1; return -1;
} }
pRead->pReadLogTFile = pLogTFile; pRead->pLogFile = pLogTFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
@ -114,11 +143,11 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1; return -1;
} }
pRead->pReadIdxTFile = pIdxTFile; pRead->pIdxFile = pIdxTFile;
return 0; return 0;
} }
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) { if (ver == pRead->curVersion) {
return 0; return 0;
@ -153,9 +182,94 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; } void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
}
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen != sizeof(SWalCkHead)) {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curVersion = -1;
return -1;
}
return 0;
}
static int32_t walFetchBodyNew(SWalReader *pRead) {
SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = ptr;
pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since %s",
pRead->pHead->head.version, ver, tstrerror(terrno));
} else {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since file corrupted",
pRead->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curVersion = -1;
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
pRead->curVersion = ver + 1;
return 0;
}
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
ASSERT(pRead->curVersion == pRead->pHead->head.version);
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code; int64_t code;
// TODO: valid ver // TODO: valid ver
@ -168,9 +282,9 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
if (code < 0) return -1; if (code < 0) return -1;
} }
ASSERT(taosValidFile(pRead->pReadLogTFile) == true); ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead)); code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) { if (code != sizeof(SWalCkHead)) {
return -1; return -1;
} }
@ -186,12 +300,12 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
return 0; return 0;
} }
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code; int64_t code;
ASSERT(pRead->curVersion == pHead->head.version); ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1; pRead->curVersion = -1;
@ -203,7 +317,7 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
return 0; return 0;
} }
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) { int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head); SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
@ -218,7 +332,7 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
pRead->capacity = pReadHead->bodyLen; pRead->capacity = pReadHead->bodyLen;
} }
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
@ -241,9 +355,9 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
return 0; return 0;
} }
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) { int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex); taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) { if (walReadVer(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
@ -257,7 +371,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHea
return 0; return 0;
} }
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code; int64_t code;
if (pRead->pWal->vers.firstVer == -1) { if (pRead->pWal->vers.firstVer == -1) {
@ -280,9 +394,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return -1; return -1;
} }
ASSERT(taosValidFile(pRead->pReadLogTFile) == true); ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead)); code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) { if (code != sizeof(SWalCkHead)) {
if (code < 0) if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -310,7 +424,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen; pRead->capacity = pRead->pHead->head.bodyLen;
} }
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) { pRead->pHead->head.bodyLen) {
if (code < 0) if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -340,46 +454,3 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
#if 0
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
if (code != 0) {
return code;
}
if (*ppHead == NULL) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
if (ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidHeadCksum(*ppHead) != 0) {
return -1;
}
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
if (ptr == NULL) {
taosMemoryFree(*ppHead);
*ppHead = NULL;
return -1;
}
if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidBodyCksum(*ppHead) != 0) {
return -1;
}
return 0;
}
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif

View File

@ -79,19 +79,21 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
} }
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
int64_t code; int64_t code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
taosThreadMutexLock(&pWal->mutex);
// find correct file // find correct file
if (ver < walGetLastFileFirstVer(pWal)) { if (ver < walGetLastFileFirstVer(pWal)) {
// change current files // change current files
code = walChangeWrite(pWal, ver); code = walChangeWrite(pWal, ver);
if (code < 0) { if (code < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
@ -146,6 +148,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(taosValidFile(pLogTFile)); ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead)); int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) { if (size != sizeof(SWalCkHead)) {
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = walValidHeadCksum(&head); code = walValidHeadCksum(&head);
@ -154,11 +157,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
if (head.head.version != ver) { if (head.head.version != ver) {
ASSERT(0); ASSERT(0);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
@ -167,12 +172,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = taosFtruncateFile(pIdxTFile, idxOff); code = taosFtruncateFile(pIdxTFile, idxOff);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
pWal->vers.lastVer = ver - 1; pWal->vers.lastVer = ver - 1;

View File

@ -293,7 +293,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
TEST_F(WalKeepEnv, readHandleRead) { TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv(); walResetEnv();
int code; int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal); SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL); ASSERT(pRead != NULL);
int i; int i;
@ -306,7 +306,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
} }
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100; int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver); code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
// printf("rrbody: \n"); // printf("rrbody: \n");
@ -325,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
} }
walCloseReadHandle(pRead); walCloseReader(pRead);
} }
TEST_F(WalRetentionEnv, repairMeta1) { TEST_F(WalRetentionEnv, repairMeta1) {
@ -354,12 +354,12 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT_EQ(pWal->vers.lastVer, 99); ASSERT_EQ(pWal->vers.lastVer, 99);
SWalReadHandle* pRead = walOpenReadHandle(pWal); SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL); ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100; int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver); code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
// printf("rrbody: \n"); // printf("rrbody: \n");
@ -389,7 +389,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 200; int ver = taosRand() % 200;
code = walReadWithHandle(pRead, ver); code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
// printf("rrbody: \n"); // printf("rrbody: \n");

View File

@ -857,19 +857,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return; return;
} }
buf[n] = '\0'; buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') { char *zi = strstr(buf, "zoneinfo");
if (tz) { if (!zi) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed"); printf("parsing /etc/localtime failed");
return; return;
} }
tz = zi + strlen("zoneinfo") + 1;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv("TZ", tz, 1); setenv("TZ", tz, 1);
tzset(); tzset();
@ -962,19 +970,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return; return;
} }
buf[n] = '\0'; buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') { char *zi = strstr(buf, "zoneinfo");
if (tz) { if (!zi) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed"); printf("parsing /etc/localtime failed");
return; return;
} }
tz = zi + strlen("zoneinfo") + 1;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv("TZ", tz, 1); setenv("TZ", tz, 1);
tzset(); tzset();

View File

@ -64,6 +64,20 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing return j + 1; // only one quote, do nothing
} }
char *strDupUnquo(const char *src) {
if (src == NULL) return NULL;
if (src[0] != '`') return strdup(src);
int32_t len = (int32_t)strlen(src);
if (src[len - 1] != '`') return NULL;
char *ret = taosMemoryMalloc(len);
if (ret == NULL) return NULL;
for (int32_t i = 0; i < len - 1; i++) {
ret[i] = src[i + 1];
}
ret[len - 1] = 0;
return ret;
}
size_t strtrim(char *z) { size_t strtrim(char *z) {
int32_t i = 0; int32_t i = 0;
int32_t j = 0; int32_t j = 0;

View File

@ -141,7 +141,7 @@ if $data01 != 7 then
goto loop1 goto loop1
endi endi
if $data02 != 9 then if $data02 != 18 then
print =====data02=$data02 print =====data02=$data02
goto loop1 goto loop1
endi endi
@ -151,22 +151,22 @@ if $data03 != 4 then
goto loop1 goto loop1
endi endi
if $data04 != 1.100000000 then if $data04 != 1.000000000 then
print ======$data04 print ======$data04
return -1 return -1
endi endi
if $data05 != 0.816496581 then if $data05 != 1.154700538 then
print ======$data05 print ======$data05
return -1 return -1
endi endi
if $data06 != 3 then if $data06 != 4 then
print ======$data06 print ======$data06
return -1 return -1
endi endi
if $data07 != 1.100000000 then if $data07 != 1.000000000 then
print ======$data07 print ======$data07
return -1 return -1
endi endi
@ -235,7 +235,7 @@ sql create stream streams4 trigger at_once watermark 1d into streamt4 as select
# sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s); # sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
# sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s); # sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s); sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s); # sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0); sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4); sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8); sql insert into t1 values(1648791213003,4,9,3,4.8);
@ -288,10 +288,10 @@ if $rows == 0 then
goto loop3 goto loop3
endi endi
sql select * from streamt8; #sql select * from streamt8;
if $rows == 0 then #if $rows == 0 then
print ======$rows # print ======$rows
goto loop3 # goto loop3
endi #endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -25,7 +25,7 @@ class TDTestCase:
paraDict = {'dbName': 'db2', paraDict = {'dbName': 'db2',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
'vgroups': 4, 'vgroups': 1,
'stbName': 'stb', 'stbName': 'stb',
'colPrefix': 'c', 'colPrefix': 'c',
'tagPrefix': 't', 'tagPrefix': 't',
@ -44,7 +44,7 @@ class TDTestCase:
topicNameList = ['topic1'] topicNameList = ['topic1']
expectRowsList = [] expectRowsList = []
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1)
tdLog.info("create stb") tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb") tdLog.info("create ctb")

View File

@ -0,0 +1 @@
#python3 ./test.py -f 2-query/last.py -Q 3

View File

@ -295,7 +295,7 @@ python3 ./test.py -f 2-query/Today.py -Q 3
python3 ./test.py -f 2-query/max.py -Q 3 python3 ./test.py -f 2-query/max.py -Q 3
python3 ./test.py -f 2-query/min.py -Q 3 python3 ./test.py -f 2-query/min.py -Q 3
python3 ./test.py -f 2-query/count.py -Q 3 python3 ./test.py -f 2-query/count.py -Q 3
python3 ./test.py -f 2-query/last.py -Q 3 #python3 ./test.py -f 2-query/last.py -Q 3
python3 ./test.py -f 2-query/first.py -Q 3 python3 ./test.py -f 2-query/first.py -Q 3
python3 ./test.py -f 2-query/To_iso8601.py -Q 3 python3 ./test.py -f 2-query/To_iso8601.py -Q 3
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3 python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3

@ -1 +1 @@
Subproject commit c885e967e490105999b84d009a15168728dfafaf Subproject commit 389047db713a3dddfbce292c3260b0864b17d936