From 3d141cf109ade50689e1d2fa651099d007de3b74 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 15 Dec 2021 17:17:50 +0800 Subject: [PATCH] refactor wal and tq --- include/dnode/vnode/tq/tq.h | 14 +- include/libs/wal/wal.h | 6 +- source/dnode/vnode/tq/inc/tqMetaStore.h | 22 +- source/dnode/vnode/tq/src/tq.c | 5 +- source/dnode/vnode/tq/src/tqMetaStore.c | 93 ++++--- source/libs/wal/inc/walInt.h | 8 + source/libs/wal/src/walMeta.c | 41 ++++ source/libs/wal/src/walMgmt.c | 8 +- source/libs/wal/src/walSeek.c | 20 +- source/libs/wal/src/walWrite.c | 5 +- src/query/src/qExecutor.c | 314 ++++++++++++------------ 11 files changed, 293 insertions(+), 243 deletions(-) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 7359df92cc..747d97b7a0 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -226,21 +226,21 @@ typedef struct TqMetaHandle { int64_t serializedSize; void* valueInUse; void* valueInTxn; -} TqMetaHandle; +} STqMetaHandle; typedef struct TqMetaList { - TqMetaHandle handle; + STqMetaHandle handle; struct TqMetaList* next; //struct TqMetaList* inTxnPrev; //struct TqMetaList* inTxnNext; struct TqMetaList* unpersistPrev; struct TqMetaList* unpersistNext; -} TqMetaList; +} STqMetaList; typedef struct TqMetaStore { - TqMetaList* bucket[TQ_BUCKET_SIZE]; + STqMetaList* bucket[TQ_BUCKET_SIZE]; //a table head - TqMetaList* unpersistHead; + STqMetaList* unpersistHead; //TODO:temporaral use, to be replaced by unified tfile int fileFd; //TODO:temporaral use, to be replaced by unified tfile @@ -250,7 +250,7 @@ typedef struct TqMetaStore { TqSerializeFun pSerializer; TqDeserializeFun pDeserializer; TqDeleteFun pDeleter; -} TqMetaStore; +} STqMetaStore; typedef struct STQ { // the collection of group handle @@ -259,7 +259,7 @@ typedef struct STQ { STqCfg* tqConfig; TqLogReader* tqLogReader; TqMemRef tqMemRef; - TqMetaStore* tqMeta; + STqMetaStore* tqMeta; } STQ; // open in each vnode diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index e19d65837a..bafdc6c082 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -88,17 +88,17 @@ typedef struct SWalVer { typedef struct SWal { // cfg SWalCfg cfg; + int32_t fsyncSeq; + //meta SWalVer vers; - //file set int64_t writeLogTfd; int64_t writeIdxTfd; int32_t writeCur; SArray* fileInfoSet; - //statistics + //status int64_t totSize; int64_t lastRollSeq; //ctl - int32_t fsyncSeq; int64_t refId; pthread_mutex_t mutex; //path diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index b9e702a89a..4190125e44 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -24,29 +24,29 @@ extern "C" { #endif -TqMetaStore* tqStoreOpen(const char* path, +STqMetaStore* tqStoreOpen(const char* path, TqSerializeFun pSerializer, TqDeserializeFun pDeserializer, TqDeleteFun pDeleter, int32_t tqConfigFlag ); -int32_t tqStoreClose(TqMetaStore*); +int32_t tqStoreClose(STqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*); //int32_t tqStoreCommitAll(TqMetaStore*); -int32_t tqStorePersist(TqMetaStore*); +int32_t tqStorePersist(STqMetaStore*); //clean deleted idx and data from persistent file -int32_t tqStoreCompact(TqMetaStore*); +int32_t tqStoreCompact(STqMetaStore*); -void* tqHandleGet(TqMetaStore*, int64_t key); +void* tqHandleGet(STqMetaStore*, int64_t key); //make it unpersist -void* tqHandleTouchGet(TqMetaStore*, int64_t key); -int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); -int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); +void* tqHandleTouchGet(STqMetaStore*, int64_t key); +int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value); +int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); //delete committed kv pair //notice that a delete action still needs to be committed -int32_t tqHandleDel(TqMetaStore*, int64_t key); -int32_t tqHandleCommit(TqMetaStore*, int64_t key); -int32_t tqHandleAbort(TqMetaStore*, int64_t key); +int32_t tqHandleDel(STqMetaStore*, int64_t key); +int32_t tqHandleCommit(STqMetaStore*, int64_t key); +int32_t tqHandleAbort(STqMetaStore*, int64_t key); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index c1a46e567b..5888141c58 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -49,8 +49,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->tqLogReader = tqLogReader; - // pTq->tqMemRef.pAlloctorFactory = allocFac; - // pTq->tqMemRef.pAllocator = allocFac->create(allocFac); + pTq->tqMemRef.pAlloctorFactory = allocFac; + pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if(pTq->tqMemRef.pAllocator == NULL) { //TODO } @@ -202,7 +202,6 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) { return totSize; } - TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; } diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 81b48191bc..3f40c94f24 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -22,11 +22,10 @@ #define TQ_META_NAME "tq.meta" #define TQ_IDX_NAME "tq.idx" +static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value); +static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key); -static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value); -static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key); - -static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) { +static inline void tqLinkUnpersist(STqMetaStore *pMeta, STqMetaList* pNode) { if(pNode->unpersistNext == NULL) { pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; pNode->unpersistPrev = pMeta->unpersistHead; @@ -68,18 +67,18 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { return lseek(fd, offset, SEEK_SET); } -TqMetaStore* tqStoreOpen(const char* path, +STqMetaStore* tqStoreOpen(const char* path, TqSerializeFun serializer, TqDeserializeFun deserializer, TqDeleteFun deleter, int32_t tqConfigFlag ) { - TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); + STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); if(pMeta == NULL) { //close return NULL; } - memset(pMeta, 0, sizeof(TqMetaStore)); + memset(pMeta, 0, sizeof(STqMetaStore)); //concat data file name and index file name size_t pathLen = strlen(path); @@ -105,14 +104,14 @@ TqMetaStore* tqStoreOpen(const char* path, } pMeta->idxFd = idxFd; - pMeta->unpersistHead = malloc(sizeof(TqMetaList)); + pMeta->unpersistHead = malloc(sizeof(STqMetaList)); if(pMeta->unpersistHead == NULL) { ASSERT(false); //close file //free memory return NULL; } - memset(pMeta->unpersistHead, 0, sizeof(TqMetaList)); + memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead; @@ -149,11 +148,11 @@ TqMetaStore* tqStoreOpen(const char* path, ASSERT(idxBuf.head.writeOffset == idxRead); //loop read every entry for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { - TqMetaList *pNode = malloc(sizeof(TqMetaList)); + STqMetaList *pNode = malloc(sizeof(STqMetaList)); if(pNode == NULL) { //TODO: free memory and return error } - memset(pNode, 0, sizeof(TqMetaList)); + memset(pNode, 0, sizeof(STqMetaList)); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); lseek(fileFd, pNode->handle.offset, SEEK_SET); @@ -199,7 +198,7 @@ TqMetaStore* tqStoreOpen(const char* path, //put into list int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; - TqMetaList* pBucketNode = pMeta->bucket[bucketKey]; + STqMetaList* pBucketNode = pMeta->bucket[bucketKey]; if(pBucketNode == NULL) { pMeta->bucket[bucketKey] = pNode; } else if(pBucketNode->handle.key == pNode->handle.key) { @@ -212,7 +211,7 @@ TqMetaStore* tqStoreOpen(const char* path, } if(pBucketNode->next) { ASSERT(pBucketNode->next->handle.key == pNode->handle.key); - TqMetaList *pNodeFound = pBucketNode->next; + STqMetaList *pNodeFound = pBucketNode->next; pNode->next = pNodeFound->next; pBucketNode->next = pNode; pBucketNode = pNodeFound; @@ -239,7 +238,7 @@ TqMetaStore* tqStoreOpen(const char* path, return pMeta; } -int32_t tqStoreClose(TqMetaStore* pMeta) { +int32_t tqStoreClose(STqMetaStore* pMeta) { //commit data and idx tqStorePersist(pMeta); ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next==NULL); @@ -247,7 +246,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { close(pMeta->idxFd); //free memory for(int i = 0; i < TQ_BUCKET_SIZE; i++) { - TqMetaList* pNode = pMeta->bucket[i]; + STqMetaList* pNode = pMeta->bucket[i]; while(pNode) { ASSERT(pNode->unpersistNext == NULL); ASSERT(pNode->unpersistPrev == NULL); @@ -259,7 +258,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } - TqMetaList* next = pNode->next; + STqMetaList* next = pNode->next; free(pNode); pNode = next; } @@ -270,12 +269,12 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { return 0; } -int32_t tqStoreDelete(TqMetaStore* pMeta) { +int32_t tqStoreDelete(STqMetaStore* pMeta) { close(pMeta->fileFd); close(pMeta->idxFd); //free memory for(int i = 0; i < TQ_BUCKET_SIZE; i++) { - TqMetaList* pNode = pMeta->bucket[i]; + STqMetaList* pNode = pMeta->bucket[i]; pMeta->bucket[i] = NULL; while(pNode) { if(pNode->handle.valueInTxn @@ -286,7 +285,7 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } - TqMetaList* next = pNode->next; + STqMetaList* next = pNode->next; free(pNode); pNode = next; } @@ -299,11 +298,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { } //TODO: wrap in tfile -int32_t tqStorePersist(TqMetaStore* pMeta) { +int32_t tqStorePersist(STqMetaStore* pMeta) { TqIdxPageBuf idxBuf; int64_t* bufPtr = (int64_t*)idxBuf.buffer; - TqMetaList *pHead = pMeta->unpersistHead; - TqMetaList *pNode = pHead->unpersistNext; + STqMetaList *pHead = pMeta->unpersistHead; + STqMetaList *pNode = pHead->unpersistNext; TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead)); if(pSHead == NULL) { //TODO: memory error @@ -384,11 +383,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { pNode->handle.valueInTxn == NULL ) { int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; - TqMetaList* pBucketHead = pMeta->bucket[bucketKey]; + STqMetaList* pBucketHead = pMeta->bucket[bucketKey]; if(pBucketHead == pNode) { pMeta->bucket[bucketKey] = pNode->next; } else { - TqMetaList* pBucketNode = pBucketHead; + STqMetaList* pBucketNode = pBucketHead; while(pBucketNode->next != NULL && pBucketNode->next != pNode) { pBucketNode = pBucketNode->next; @@ -415,9 +414,9 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { return 0; } -static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) { +static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { //TODO: think about thread safety @@ -432,12 +431,12 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value pNode = pNode->next; } } - TqMetaList *pNewNode = malloc(sizeof(TqMetaList)); + STqMetaList *pNewNode = malloc(sizeof(STqMetaList)); if(pNewNode == NULL) { //TODO: memory error return -1; } - memset(pNewNode, 0, sizeof(TqMetaList)); + memset(pNewNode, 0, sizeof(STqMetaList)); pNewNode->handle.key = key; pNewNode->handle.valueInUse = value; //put into unpersist list @@ -448,9 +447,9 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value return 0; } -void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { +void* tqHandleGet(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { if(pNode->handle.valueInUse != NULL @@ -466,9 +465,9 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { return NULL; } -void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) { +void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { if(pNode->handle.valueInUse != NULL @@ -485,9 +484,9 @@ void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) { return NULL; } -static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* value) { +static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { //TODO: think about thread safety @@ -506,12 +505,12 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val pNode = pNode->next; } } - TqMetaList *pNewNode = malloc(sizeof(TqMetaList)); + STqMetaList *pNewNode = malloc(sizeof(STqMetaList)); if(pNewNode == NULL) { //TODO: memory error return -1; } - memset(pNewNode, 0, sizeof(TqMetaList)); + memset(pNewNode, 0, sizeof(STqMetaList)); pNewNode->handle.key = key; pNewNode->handle.valueInTxn = value; pNewNode->next = pMeta->bucket[bucketKey]; @@ -520,11 +519,11 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val return 0; } -int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { +int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); } -int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { +int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { void *vmem = malloc(vsize); if(vmem == NULL) { //TODO: memory error @@ -534,9 +533,9 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi return tqHandlePutImpl(pMeta, key, vmem); } -static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { +static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { if(pNode->handle.valueInTxn != NULL @@ -552,9 +551,9 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { return NULL; } -int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { +int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { if(pNode->handle.valueInTxn == NULL) { @@ -575,9 +574,9 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { return -2; } -int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { +int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { if(pNode->handle.valueInTxn) { @@ -596,9 +595,9 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { return -2; } -int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { +int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; + STqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn) { @@ -616,6 +615,6 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { } //TODO: clean deleted idx and data from persistent file -int32_t tqStoreCompact(TqMetaStore *pMeta) { +int32_t tqStoreCompact(STqMetaStore *pMeta) { return 0; } diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 48142878c3..819afcc411 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -105,6 +105,10 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { return taosCalcChecksum(0, (uint8_t*)body, len); } +static inline int64_t walGetVerIdxOffset(SWal* pWal, int64_t ver) { + return (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); +} + static inline void walResetVer(SWalVer* pVer) { pVer->firstVer = -1; pVer->verInSnapshotting = -1; @@ -117,6 +121,10 @@ int walLoadMeta(SWal* pWal); int walSaveMeta(SWal* pWal); int walRollFileInfo(SWal* pWal); +int walCheckAndRepairMeta(SWal* pWal); + +int walCheckAndRepairIdx(SWal* pWal); + char* walMetaSerialize(SWal* pWal); int walMetaDeserialize(SWal* pWal, const char* bytes); //meta section end diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index aa592b4fe8..e2715d51f1 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -40,6 +40,47 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } +int walCheckAndRepairMeta(SWal* pWal) { + // load log files, get first/snapshot/last version info + const char* logPattern = "^[0-9]+.log$"; + const char* idxPattern = "^[0-9]+.idx$"; + regex_t logRegPattern; + regex_t idxRegPattern; + SArray* pLogArray = taosArrayInit(8, sizeof(int64_t)); + + regcomp(&logRegPattern, logPattern, REG_EXTENDED); + regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); + + DIR *dir = opendir(pWal->path); + if(dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent* ent; + while((ent = readdir(dir)) != NULL) { + char *name = basename(ent->d_name); + int code = regexec(&logRegPattern, name, 0, NULL, 0); + if(code == 0) { + int64_t firstVer; + sscanf(name, "%" PRId64 ".log", &firstVer); + taosArrayPush(pLogArray, &firstVer); + } + } + + + // load meta + // if not match, or meta missing + // rebuild meta + return 0; +} + +int walCheckAndRepairIdx(SWal* pWal) { + // iterate all idx files + // check first and last entry of each idx file valid + return 0; +} + int walRollFileInfo(SWal* pWal) { int64_t ts = taosGetTimestampSec(); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 9efeb83cf0..06df3eb3be 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -90,6 +90,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } //open meta + walResetVer(&pWal->vers); pWal->writeLogTfd = -1; pWal->writeIdxTfd = -1; pWal->writeCur = -1; @@ -101,7 +102,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } //init status - walResetVer(&pWal->vers); pWal->totSize = 0; pWal->lastRollSeq = -1; @@ -123,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if(walLoadMeta(pWal) < 0) { + if(walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) { taosRemoveRef(tsWal.refSetId, pWal->refId); pthread_mutex_destroy(&pWal->mutex); taosArrayDestroy(pWal->fileInfoSet); @@ -131,6 +131,10 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } + if(walCheckAndRepairIdx(pWal) < 0) { + + } + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); return pWal; diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 7db5b90c1d..769b2fd2e4 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -27,20 +27,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int64_t logTfd = pWal->writeLogTfd; //seek position - int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); - code = tfLseek(idxTfd, offset, SEEK_SET); - if(code != 0) { - return -1; - } - int64_t readBuf[2]; - code = tfRead(idxTfd, readBuf, sizeof(readBuf)); + int64_t idxOff = walGetVerIdxOffset(pWal, ver); + code = tfLseek(idxTfd, idxOff, SEEK_SET); if(code != 0) { return -1; } + WalIdxEntry entry; //TODO:deserialize - ASSERT(readBuf[0] == ver); - code = tfLseek(logTfd, readBuf[1], SEEK_CUR); - if (code != 0) { + code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry)); + if(code != 0) { + return -1; + } + ASSERT(entry.ver == ver); + code = tfLseek(logTfd, entry.offset, SEEK_CUR); + if (code < 0) { return -1; } return code; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index ffbb19c6b7..67b70f249d 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -68,13 +68,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) { walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); int64_t idxTfd = tfOpenReadWrite(fnameStr); - //change to deserialize function - + //TODO:change to deserialize function if(idxTfd < 0) { pthread_mutex_unlock(&pWal->mutex); return -1; } - int idxOff = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); + int64_t idxOff = walGetVerIdxOffset(pWal, ver); code = tfLseek(idxTfd, idxOff, SEEK_SET); if(code < 0) { pthread_mutex_unlock(&pWal->mutex); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4627377a8e..85f961b829 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5142,22 +5142,22 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = 0; - pInfo->order = pRuntimeEnv->pQueryAttr->order.order; - pInfo->current = 0; + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = 0; + pInfo->order = pRuntimeEnv->pQueryAttr->order.order; + pInfo->current = 0; // pInfo->prevGroupId = -1; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableScanOperator"; - pOperator->operatorType = OP_TableScan; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doTableScan; + pOperator->name = "TableScanOperator"; + pOperator->operatorType = OP_TableScan; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doTableScan; return pOperator; } @@ -5174,14 +5174,14 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pRuntimeEnv->enableGroupData = true; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableSeqScanOperator"; - pOperator->operatorType = OP_TableSeqScan; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doTableScanImpl; + pOperator->name = "TableSeqScanOperator"; + pOperator->operatorType = OP_TableSeqScan; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doTableScanImpl; return pOperator; } @@ -5199,14 +5199,14 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu taosArrayPush(pInfo->block.pDataBlock, &infoData); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableBlockInfoScanOperator"; - pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->exec = doBlockInfoScan; + pOperator->name = "TableBlockInfoScanOperator"; + pOperator->operatorType = OP_TableBlockInfoScan; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->exec = doBlockInfoScan; return pOperator; } @@ -5271,11 +5271,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->current = 0; - pInfo->order = pRuntimeEnv->pQueryAttr->order.order; + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->current = 0; + pInfo->order = pRuntimeEnv->pQueryAttr->order.order; SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DataBlocksOptimizedScanOperator"; @@ -5429,14 +5429,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "GlobalAggregate"; - pOperator->operatorType = OP_GlobalAggregate; - pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->name = "GlobalAggregate"; + pOperator->operatorType = OP_GlobalAggregate; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = doGlobalAggregate; pOperator->cleanup = destroyGlobalAggOperatorInfo; @@ -5473,16 +5473,16 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "MultiwaySortOperator"; - pOperator->operatorType = OP_MultiwayMergeSort; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = numOfOutput; - pOperator->pExpr = pExpr; - pOperator->exec = doMultiwayMergeSort; - pOperator->cleanup = destroyGlobalAggOperatorInfo; + pOperator->name = "MultiwaySortOperator"; + pOperator->operatorType = OP_MultiwayMergeSort; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->numOfOutput = numOfOutput; + pOperator->pExpr = pExpr; + pOperator->exec = doMultiwayMergeSort; + pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; } @@ -6543,14 +6543,14 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableAggregate"; - pOperator->operatorType = OP_Aggregate; - pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->name = "TableAggregate"; + pOperator->operatorType = OP_Aggregate; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = doAggregate; pOperator->cleanup = destroyAggOperatorInfo; @@ -6638,14 +6638,14 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "MultiTableAggregate"; - pOperator->operatorType = OP_MultiTableAggregate; - pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->name = "MultiTableAggregate"; + pOperator->operatorType = OP_MultiTableAggregate; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = doSTableAggregate; pOperator->cleanup = destroyAggOperatorInfo; @@ -6668,14 +6668,14 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "ProjectOperator"; - pOperator->operatorType = OP_Project; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->name = "ProjectOperator"; + pOperator->operatorType = OP_Project; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = doProjectOperation; pOperator->cleanup = destroyProjectOperatorInfo; @@ -6920,16 +6920,16 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "GroupbyAggOperator"; - pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; - pOperator->operatorType = OP_Groupby; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = hashGroupbyAggregate; - pOperator->cleanup = destroyGroupbyOperatorInfo; + pOperator->name = "GroupbyAggOperator"; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->operatorType = OP_Groupby; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = hashGroupbyAggregate; + pOperator->cleanup = destroyGroupbyOperatorInfo; appendUpstream(pOperator, upstream); return pOperator; @@ -7163,16 +7163,16 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf pInfo->curPos = 0; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "SeqTableTagScan"; - pOperator->operatorType = OP_TagScan; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->exec = doTagScan; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->cleanup = destroyTagScanOperatorInfo; + pOperator->name = "SeqTableTagScan"; + pOperator->operatorType = OP_TagScan; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->exec = doTagScan; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->cleanup = destroyTagScanOperatorInfo; return pOperator; } @@ -7302,17 +7302,17 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "DistinctOperator"; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->operatorType = OP_Distinct; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = hashDistinct; - pOperator->pExpr = pExpr; - pOperator->cleanup = destroyDistinctOperatorInfo; + pOperator->name = "DistinctOperator"; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->operatorType = OP_Distinct; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = hashDistinct; + pOperator->pExpr = pExpr; + pOperator->cleanup = destroyDistinctOperatorInfo; appendUpstream(pOperator, upstream); return pOperator; @@ -7587,20 +7587,20 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { param->pExpr[i] = pExprMsg; - pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); - pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); - pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - pExprMsg->colBytes = htons(pExprMsg->colBytes); - pExprMsg->colType = htons(pExprMsg->colType); + pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); + pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->colBytes = htons(pExprMsg->colBytes); + pExprMsg->colType = htons(pExprMsg->colType); - pExprMsg->resType = htons(pExprMsg->resType); - pExprMsg->resBytes = htons(pExprMsg->resBytes); - pExprMsg->interBytes = htonl(pExprMsg->interBytes); + pExprMsg->resType = htons(pExprMsg->resType); + pExprMsg->resBytes = htons(pExprMsg->resBytes); + pExprMsg->interBytes = htonl(pExprMsg->interBytes); - pExprMsg->functionId = htons(pExprMsg->functionId); - pExprMsg->numOfParams = htons(pExprMsg->numOfParams); - pExprMsg->resColId = htons(pExprMsg->resColId); - pExprMsg->flist.numOfFilters = htons(pExprMsg->flist.numOfFilters); + pExprMsg->functionId = htons(pExprMsg->functionId); + pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + pExprMsg->resColId = htons(pExprMsg->resColId); + pExprMsg->flist.numOfFilters = htons(pExprMsg->flist.numOfFilters); pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { @@ -7639,15 +7639,15 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { param->pSecExpr[i] = pExprMsg; pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); - pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); - pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - pExprMsg->resType = htons(pExprMsg->resType); - pExprMsg->resBytes = htons(pExprMsg->resBytes); - pExprMsg->colBytes = htons(pExprMsg->colBytes); - pExprMsg->colType = htons(pExprMsg->colType); + pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->resType = htons(pExprMsg->resType); + pExprMsg->resBytes = htons(pExprMsg->resBytes); + pExprMsg->colBytes = htons(pExprMsg->colBytes); + pExprMsg->colType = htons(pExprMsg->colType); - pExprMsg->functionId = htons(pExprMsg->functionId); - pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + pExprMsg->functionId = htons(pExprMsg->functionId); + pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pMsg += sizeof(SSqlExpr); @@ -8422,40 +8422,40 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S SQueryAttr* pQueryAttr = &pQInfo->query; pQInfo->runtimeEnv.pQueryAttr = pQueryAttr; - pQueryAttr->tableGroupInfo = *pTableGroupInfo; - pQueryAttr->numOfCols = numOfCols; - pQueryAttr->numOfOutput = numOfOutput; - pQueryAttr->limit.limit = pQueryMsg->limit; - pQueryAttr->limit.offset = pQueryMsg->offset; - pQueryAttr->order.order = pQueryMsg->order; - pQueryAttr->order.orderColId = pQueryMsg->orderColId; - pQueryAttr->pExpr1 = pExprs; - pQueryAttr->pExpr2 = pSecExprs; - pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput; - pQueryAttr->pGroupbyExpr = pGroupbyExpr; + pQueryAttr->tableGroupInfo = *pTableGroupInfo; + pQueryAttr->numOfCols = numOfCols; + pQueryAttr->numOfOutput = numOfOutput; + pQueryAttr->limit.limit = pQueryMsg->limit; + pQueryAttr->limit.offset = pQueryMsg->offset; + pQueryAttr->order.order = pQueryMsg->order; + pQueryAttr->order.orderColId = pQueryMsg->orderColId; + pQueryAttr->pExpr1 = pExprs; + pQueryAttr->pExpr2 = pSecExprs; + pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput; + pQueryAttr->pGroupbyExpr = pGroupbyExpr; memcpy(&pQueryAttr->interval, &pQueryMsg->interval, sizeof(pQueryAttr->interval)); - pQueryAttr->fillType = pQueryMsg->fillType; - pQueryAttr->numOfTags = pQueryMsg->numOfTags; - pQueryAttr->tagColList = pTagCols; + pQueryAttr->fillType = pQueryMsg->fillType; + pQueryAttr->numOfTags = pQueryMsg->numOfTags; + pQueryAttr->tagColList = pTagCols; pQueryAttr->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit; - pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; - pQueryAttr->sw = pQueryMsg->sw; - pQueryAttr->vgId = vgId; + pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; + pQueryAttr->sw = pQueryMsg->sw; + pQueryAttr->vgId = vgId; - pQueryAttr->stableQuery = pQueryMsg->stableQuery; - pQueryAttr->topBotQuery = pQueryMsg->topBotQuery; - pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn; - pQueryAttr->hasTagResults = pQueryMsg->hasTagResults; + pQueryAttr->stableQuery = pQueryMsg->stableQuery; + pQueryAttr->topBotQuery = pQueryMsg->topBotQuery; + pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn; + pQueryAttr->hasTagResults = pQueryMsg->hasTagResults; pQueryAttr->timeWindowInterpo = pQueryMsg->timeWindowInterpo; - pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist; - pQueryAttr->stabledev = pQueryMsg->stabledev; - pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery; - pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; - pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; - pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; - pQueryAttr->stateWindow = pQueryMsg->stateWindow; - pQueryAttr->vgId = vgId; - pQueryAttr->pFilters = pFilters; + pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist; + pQueryAttr->stabledev = pQueryMsg->stabledev; + pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery; + pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; + pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; + pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; + pQueryAttr->stateWindow = pQueryMsg->stateWindow; + pQueryAttr->vgId = vgId; + pQueryAttr->pFilters = pFilters; pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQueryAttr->tableCols == NULL) {