Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_new_snapshot
This commit is contained in:
commit
efb429ebd3
|
@ -2,7 +2,7 @@
|
||||||
IF (DEFINED VERNUMBER)
|
IF (DEFINED VERNUMBER)
|
||||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||||
ELSE ()
|
ELSE ()
|
||||||
SET(TD_VER_NUMBER "3.0.0.1")
|
SET(TD_VER_NUMBER "3.0.0.2")
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
IF (DEFINED VERCOMPATIBLE)
|
IF (DEFINED VERCOMPATIBLE)
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# taosws-rs
|
# taosws-rs
|
||||||
ExternalProject_Add(taosws-rs
|
ExternalProject_Add(taosws-rs
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
|
||||||
GIT_TAG 6fc47d7
|
GIT_TAG 0609b50
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -1592,6 +1592,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
colDataAppend(pColInfo, rows, buf, false);
|
colDataAppend(pColInfo, rows, buf, false);
|
||||||
|
} else if (i == 1) {
|
||||||
|
colDataAppend(pColInfo, rows, (const char *)&pDb->createdTime, false);
|
||||||
} else if (i == 3) {
|
} else if (i == 3) {
|
||||||
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
|
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
|
||||||
} else if (i == 14) {
|
} else if (i == 14) {
|
||||||
|
|
|
@ -211,6 +211,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
|
||||||
void tdRSmaFSClose(SRSmaFS *fs);
|
void tdRSmaFSClose(SRSmaFS *fs);
|
||||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
||||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
||||||
|
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat);
|
||||||
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
|
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
|
||||||
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
|
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
|
||||||
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||||
|
|
|
@ -643,11 +643,20 @@ typedef struct {
|
||||||
TSDBROW row;
|
TSDBROW row;
|
||||||
} SRowInfo;
|
} SRowInfo;
|
||||||
|
|
||||||
|
typedef struct SSttBlockLoadInfo {
|
||||||
|
SBlockData blockData[2];
|
||||||
|
SArray *aSttBlk;
|
||||||
|
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||||
|
int32_t currentLoadBlockIndex;
|
||||||
|
} SSttBlockLoadInfo;
|
||||||
|
|
||||||
typedef struct SMergeTree {
|
typedef struct SMergeTree {
|
||||||
int8_t backward;
|
int8_t backward;
|
||||||
SRBTree rbt;
|
SRBTree rbt;
|
||||||
SArray *pIterList;
|
SArray *pIterList;
|
||||||
SLDataIter *pIter;
|
SLDataIter *pIter;
|
||||||
|
bool destroyLoadInfo;
|
||||||
|
SSttBlockLoadInfo* pLoadInfo;
|
||||||
} SMergeTree;
|
} SMergeTree;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -657,12 +666,16 @@ typedef struct {
|
||||||
} SSkmInfo;
|
} SSkmInfo;
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange);
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
void tMergeTreeClose(SMergeTree *pMTree);
|
void tMergeTreeClose(SMergeTree *pMTree);
|
||||||
|
|
||||||
|
SSttBlockLoadInfo* tCreateLastBlockLoadInfo();
|
||||||
|
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
|
||||||
|
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
|
||||||
|
|
||||||
// ========== inline functions ==========
|
// ========== inline functions ==========
|
||||||
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||||
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
||||||
|
|
|
@ -182,6 +182,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SRSmaFS *pFS = RSMA_FS(pStat);
|
SRSmaFS *pFS = RSMA_FS(pStat);
|
||||||
int64_t committed = pStat->commitAppliedVer;
|
int64_t committed = pStat->commitAppliedVer;
|
||||||
|
int64_t fsMaxVer = -1;
|
||||||
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
@ -204,10 +205,20 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
++i;
|
++i;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
|
if (taosArrayGetSize(pFS->aQTaskInf) > 0) {
|
||||||
if (tdRSmaFSUpsertQTaskFile(pFS, &qFile) < 0) {
|
fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version;
|
||||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
}
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
|
if (fsMaxVer < committed) {
|
||||||
|
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
|
||||||
|
if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) {
|
||||||
|
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode),
|
||||||
|
committed, fsMaxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
@ -365,7 +376,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
|
|
||||||
// step 1: merge qTaskInfo and iQTaskInfo
|
// step 1: merge qTaskInfo and iQTaskInfo
|
||||||
// lock
|
// lock
|
||||||
|
|
|
@ -49,7 +49,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
|
||||||
int32_t vid = 0;
|
int32_t vid = 0;
|
||||||
int64_t version = -1;
|
int64_t version = -1;
|
||||||
sscanf((const char *)taosArrayGetP(output, i), "v%dqinfo.v%" PRIi64, &vid, &version);
|
sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version);
|
||||||
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
|
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
|
||||||
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
|
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -96,6 +96,18 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
||||||
return oldVal;
|
return oldVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
|
||||||
|
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
||||||
|
int64_t version = -1;
|
||||||
|
|
||||||
|
taosRLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
if (taosArrayGetSize(aQTaskInf) > 0) {
|
||||||
|
version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version;
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
||||||
|
|
|
@ -1342,29 +1342,31 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) {
|
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t infoLen = 0;
|
int32_t infoLen = 0;
|
||||||
taosDecodeFixedI32(pIter->qBuf, &infoLen);
|
taosDecodeFixedI32(pIter->pBuf, &infoLen);
|
||||||
if (infoLen > nBytes) {
|
if (infoLen > nBytes) {
|
||||||
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
||||||
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
||||||
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
pIter->nAlloc = infoLen;
|
if (pIter->nAlloc < infoLen) {
|
||||||
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
pIter->nAlloc = infoLen;
|
||||||
if (!pBuf) {
|
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pBuf) {
|
||||||
return TSDB_CODE_FAILED;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
pIter->pBuf = pBuf;
|
||||||
}
|
}
|
||||||
pIter->pBuf = pBuf;
|
|
||||||
pIter->qBuf = pIter->pBuf;
|
|
||||||
nBytes = infoLen;
|
nBytes = infoLen;
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
|
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1373,6 +1375,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pIter->qBuf = pIter->pBuf;
|
||||||
pIter->offset += nBytes;
|
pIter->offset += nBytes;
|
||||||
pIter->nBytes = nBytes;
|
pIter->nBytes = nBytes;
|
||||||
pIter->nBufPos = 0;
|
pIter->nBufPos = 0;
|
||||||
|
@ -1450,17 +1453,24 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
|
||||||
|
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
|
||||||
|
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
|
||||||
|
pRSmaStat->commitAppliedVer, fsMaxVer);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
#if 0
|
#if 0
|
||||||
if (pRSmaStat->commitAppliedVer > 0) {
|
if (pRSmaStat->commitAppliedVer > 0) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||||
|
@ -1510,11 +1520,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
||||||
|
@ -1558,7 +1568,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
smaError("vgId:%d, rsma persit failed since %s", vid, terrstr());
|
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
tdRemoveTFile(&tFile);
|
tdRemoveTFile(&tFile);
|
||||||
tdDestroyTFile(&tFile);
|
tdDestroyTFile(&tFile);
|
||||||
|
|
|
@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
|
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX});
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL);
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
|
|
|
@ -22,26 +22,106 @@ struct SLDataIter {
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
int32_t iStt;
|
int32_t iStt;
|
||||||
int8_t backward;
|
int8_t backward;
|
||||||
SArray *aSttBlk;
|
|
||||||
int32_t iSttBlk;
|
int32_t iSttBlk;
|
||||||
SBlockData bData[2];
|
|
||||||
int32_t loadIndex;
|
|
||||||
int32_t iRow;
|
int32_t iRow;
|
||||||
SRowInfo rInfo;
|
SRowInfo rInfo;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
|
|
||||||
|
SSttBlockLoadInfo* pBlockLoadInfo;
|
||||||
};
|
};
|
||||||
|
|
||||||
static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; }
|
SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
|
||||||
|
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
|
||||||
|
if (pLoadInfo == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static SBlockData *getNextBlock(SLDataIter *pIter) {
|
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
pIter->loadIndex ^= 1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
return getCurrentBlock(pIter);
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
|
|
||||||
|
int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
|
||||||
|
if (code) {
|
||||||
|
terrno = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
|
||||||
|
if (code) {
|
||||||
|
terrno = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
||||||
|
}
|
||||||
|
|
||||||
|
return pLoadInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
|
||||||
|
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
|
||||||
|
taosArrayClear(pLoadInfo[i].aSttBlk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
|
||||||
|
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
|
||||||
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
|
||||||
|
tBlockDataDestroy(&pLoadInfo[i].blockData[0], true);
|
||||||
|
tBlockDataDestroy(&pLoadInfo[i].blockData[1], true);
|
||||||
|
|
||||||
|
taosArrayDestroy(pLoadInfo[i].aSttBlk);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pLoadInfo);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
|
||||||
|
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
|
||||||
|
return &pInfo->blockData[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->blockIndex[1] == pIter->iSttBlk) {
|
||||||
|
return &pInfo->blockData[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->currentLoadBlockIndex ^= 1;
|
||||||
|
if (pIter->pSttBlk != NULL) { // current block not loaded yet
|
||||||
|
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]);
|
||||||
|
tsdbDebug("read last block, index:%d, last file index:%d", pIter->iSttBlk, pIter->iStt);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||||
|
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) {
|
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
if (*pIter == NULL) {
|
if (*pIter == NULL) {
|
||||||
|
@ -55,34 +135,22 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
(*pIter)->backward = backward;
|
(*pIter)->backward = backward;
|
||||||
(*pIter)->verRange = *pRange;
|
(*pIter)->verRange = *pRange;
|
||||||
(*pIter)->timeWindow = *pTimeWindow;
|
(*pIter)->timeWindow = *pTimeWindow;
|
||||||
(*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
|
||||||
if ((*pIter)->aSttBlk == NULL) {
|
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
if (taosArrayGetSize(pBlockLoadInfo->aSttBlk) == 0) {
|
||||||
goto _exit;
|
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
|
||||||
|
if (code) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tBlockDataCreate(&(*pIter)->bData[0]);
|
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tBlockDataCreate(&(*pIter)->bData[1]);
|
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk);
|
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size = taosArrayGetSize((*pIter)->aSttBlk);
|
|
||||||
|
|
||||||
// find the start block
|
// find the start block
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
if (!backward) { // asc
|
if (!backward) { // asc
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||||
if (p->suid != suid) {
|
if (p->suid != suid) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -94,7 +162,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
}
|
}
|
||||||
} else { // desc
|
} else { // desc
|
||||||
for (int32_t i = size - 1; i >= 0; --i) {
|
for (int32_t i = size - 1; i >= 0; --i) {
|
||||||
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||||
if (p->suid != suid) {
|
if (p->suid != suid) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -108,7 +176,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
|
|
||||||
(*pIter)->iSttBlk = index;
|
(*pIter)->iSttBlk = index;
|
||||||
if (index != -1) {
|
if (index != -1) {
|
||||||
(*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk);
|
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
|
||||||
|
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -116,9 +185,6 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
void tLDataIterClose(SLDataIter *pIter) {
|
void tLDataIterClose(SLDataIter *pIter) {
|
||||||
tBlockDataDestroy(&pIter->bData[0], 1);
|
|
||||||
tBlockDataDestroy(&pIter->bData[1], 1);
|
|
||||||
taosArrayDestroy(pIter->aSttBlk);
|
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,9 +193,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
pIter->iSttBlk += step;
|
pIter->iSttBlk += step;
|
||||||
|
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
size_t size = taosArrayGetSize(pIter->aSttBlk);
|
size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
|
||||||
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
|
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
|
||||||
SSttBlk *p = taosArrayGet(pIter->aSttBlk, i);
|
SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
|
||||||
if ((!pIter->backward) && p->minUid > pIter->uid) {
|
if ((!pIter->backward) && p->minUid > pIter->uid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -169,7 +235,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
pIter->pSttBlk = NULL;
|
pIter->pSttBlk = NULL;
|
||||||
} else {
|
} else {
|
||||||
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +244,8 @@ static void findNextValidRow(SLDataIter *pIter) {
|
||||||
|
|
||||||
bool hasVal = false;
|
bool hasVal = false;
|
||||||
int32_t i = pIter->iRow;
|
int32_t i = pIter->iRow;
|
||||||
SBlockData *pBlockData = getCurrentBlock(pIter);
|
|
||||||
|
SBlockData *pBlockData = loadBlockIfMissing(pIter);
|
||||||
|
|
||||||
for (; i < pBlockData->nRow && i >= 0; i += step) {
|
for (; i < pBlockData->nRow && i >= 0; i += step) {
|
||||||
if (pBlockData->aUid != NULL) {
|
if (pBlockData->aUid != NULL) {
|
||||||
|
@ -238,19 +305,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t iBlockL = pIter->iSttBlk;
|
int32_t iBlockL = pIter->iSttBlk;
|
||||||
SBlockData *pBlockData = getCurrentBlock(pIter);
|
SBlockData *pBlockData = loadBlockIfMissing(pIter);
|
||||||
|
|
||||||
if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet
|
|
||||||
pBlockData = getNextBlock(pIter);
|
|
||||||
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->iRow += step;
|
pIter->iRow += step;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -266,12 +322,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (iBlockL != pIter->iSttBlk) {
|
if (iBlockL != pIter->iSttBlk) {
|
||||||
pBlockData = getNextBlock(pIter);
|
pBlockData = loadBlockIfMissing(pIter);
|
||||||
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
|
pIter->iRow += step;
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +365,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) {
|
||||||
pMTree->backward = backward;
|
pMTree->backward = backward;
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -322,21 +374,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
}
|
}
|
||||||
|
|
||||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SSttBlockLoadInfo* pLoadInfo = NULL;
|
||||||
|
if (pBlockLoadInfo == NULL) {
|
||||||
|
if (pMTree->pLoadInfo == NULL) {
|
||||||
|
pMTree->destroyLoadInfo = true;
|
||||||
|
pMTree->pLoadInfo = tCreateLastBlockLoadInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
pLoadInfo = pMTree->pLoadInfo;
|
||||||
|
} else {
|
||||||
|
pLoadInfo = pBlockLoadInfo;
|
||||||
|
}
|
||||||
|
|
||||||
struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0};
|
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange);
|
struct SLDataIter* pIter = NULL;
|
||||||
|
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasVal = tLDataIterNextRow(pIterList[i]);
|
bool hasVal = tLDataIterNextRow(pIter);
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
taosArrayPush(pMTree->pIterList, &pIterList[i]);
|
taosArrayPush(pMTree->pIterList, &pIter);
|
||||||
tMergeTreeAddIter(pMTree, pIterList[i]);
|
tMergeTreeAddIter(pMTree, pIter);
|
||||||
} else {
|
} else {
|
||||||
tLDataIterClose(pIterList[i]);
|
tLDataIterClose(pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,4 +457,9 @@ void tMergeTreeClose(SMergeTree *pMTree) {
|
||||||
|
|
||||||
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
|
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
|
|
||||||
|
if (pMTree->destroyLoadInfo) {
|
||||||
|
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
|
||||||
|
pMTree->destroyLoadInfo = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
|
|
||||||
#define INITIAL_ROW_INDEX_VAL (-1)
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
EXTERNAL_ROWS_PREV = 0x1,
|
EXTERNAL_ROWS_PREV = 0x1,
|
||||||
|
@ -88,6 +86,7 @@ typedef struct SLastBlockReader {
|
||||||
int32_t order;
|
int32_t order;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
SMergeTree mergeTree;
|
SMergeTree mergeTree;
|
||||||
|
SSttBlockLoadInfo* pInfo;
|
||||||
} SLastBlockReader;
|
} SLastBlockReader;
|
||||||
|
|
||||||
typedef struct SFilesetIter {
|
typedef struct SFilesetIter {
|
||||||
|
@ -226,13 +225,14 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1;
|
|
||||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||||
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||||
info.lastKey = pTsdbReader->window.skey - step;
|
int64_t skey = pTsdbReader->window.skey;
|
||||||
|
info.lastKey = (skey > INT64_MIN)? (skey - 1):skey;
|
||||||
} else {
|
} else {
|
||||||
info.lastKey = pTsdbReader->window.ekey - step;
|
int64_t ekey = pTsdbReader->window.ekey;
|
||||||
|
info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
||||||
|
@ -319,8 +319,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
|
||||||
}
|
}
|
||||||
|
|
||||||
// init file iterator
|
// init file iterator
|
||||||
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
|
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
|
||||||
STsdbReader* pReader /*int32_t order, const char* idstr*/) {
|
|
||||||
size_t numOfFileset = taosArrayGetSize(aDFileSet);
|
size_t numOfFileset = taosArrayGetSize(aDFileSet);
|
||||||
|
|
||||||
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
|
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
|
||||||
|
@ -345,6 +344,14 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
|
||||||
pLReader->uid = 0;
|
pLReader->uid = 0;
|
||||||
tMergeTreeClose(&pLReader->mergeTree);
|
tMergeTreeClose(&pLReader->mergeTree);
|
||||||
|
|
||||||
|
if (pLReader->pInfo == NULL) {
|
||||||
|
pLReader->pInfo = tCreateLastBlockLoadInfo();
|
||||||
|
if (pLReader->pInfo == NULL) {
|
||||||
|
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
|
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -360,6 +367,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
||||||
|
|
||||||
pIter->pLastBlockReader->uid = 0;
|
pIter->pLastBlockReader->uid = 0;
|
||||||
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
|
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
|
||||||
|
resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
|
||||||
|
|
||||||
// check file the time range of coverage
|
// check file the time range of coverage
|
||||||
STimeWindow win = {0};
|
STimeWindow win = {0};
|
||||||
|
@ -1377,7 +1385,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
||||||
bool mergeBlockData) {
|
bool mergeBlockData) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
// SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
|
||||||
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
|
|
||||||
STSRow* pTSRow = NULL;
|
STSRow* pTSRow = NULL;
|
||||||
|
@ -1866,36 +1873,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||||
STsdbReader* pReader) {
|
|
||||||
// the last block reader has been initialized for this table.
|
// the last block reader has been initialized for this table.
|
||||||
if (pLastBlockReader->uid == pBlockScanInfo->uid) {
|
if (pLBlockReader->uid == pScanInfo->uid) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pLastBlockReader->uid != 0) {
|
if (pLBlockReader->uid != 0) {
|
||||||
tMergeTreeClose(&pLastBlockReader->mergeTree);
|
tMergeTreeClose(&pLBlockReader->mergeTree);
|
||||||
}
|
}
|
||||||
|
|
||||||
initMemDataIterator(pBlockScanInfo, pReader);
|
initMemDataIterator(pScanInfo, pReader);
|
||||||
pLastBlockReader->uid = pBlockScanInfo->uid;
|
pLBlockReader->uid = pScanInfo->uid;
|
||||||
|
|
||||||
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
|
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
|
||||||
STimeWindow w = pLastBlockReader->window;
|
STimeWindow w = pLBlockReader->window;
|
||||||
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
|
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
||||||
w.skey = pBlockScanInfo->lastKey + step;
|
w.skey = pScanInfo->lastKey + step;
|
||||||
} else {
|
} else {
|
||||||
w.ekey = pBlockScanInfo->lastKey + step;
|
w.ekey = pScanInfo->lastKey + step;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
||||||
pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange);
|
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return nextRowFromLastBlocks(pLastBlockReader, pBlockScanInfo);
|
return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
||||||
|
@ -3305,6 +3311,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||||
if (pFilesetIter->pLastBlockReader != NULL) {
|
if (pFilesetIter->pLastBlockReader != NULL) {
|
||||||
tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree);
|
tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree);
|
||||||
|
pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo);
|
||||||
taosMemoryFree(pFilesetIter->pLastBlockReader);
|
taosMemoryFree(pFilesetIter->pLastBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2795,6 +2795,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
||||||
ASSERT(pInputCol->hasNull == true);
|
ASSERT(pInputCol->hasNull == true);
|
||||||
|
// save selectivity value for column consisted of all null values
|
||||||
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2871,7 +2873,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
if (numOfElems == 0) {
|
||||||
|
// save selectivity value for column consisted of all null values
|
||||||
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
|
}
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2892,6 +2897,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
||||||
ASSERT(pInputCol->hasNull == true);
|
ASSERT(pInputCol->hasNull == true);
|
||||||
|
// save selectivity value for column consisted of all null values
|
||||||
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2952,7 +2959,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
if (numOfElems == 0) {
|
||||||
|
// save selectivity value for column consisted of all null values
|
||||||
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
|
}
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1805,6 +1805,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||||
|
|
||||||
if (session->udfUvPipe == NULL) {
|
if (session->udfUvPipe == NULL) {
|
||||||
fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
|
fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
|
||||||
|
taosMemoryFree(session);
|
||||||
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1823,6 +1824,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
||||||
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
||||||
|
|
||||||
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
||||||
|
//TODO: synchronization refactor between libuv event loop and request thread
|
||||||
if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
|
if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
|
||||||
SClientUvConn *conn = session->udfUvPipe->data;
|
SClientUvConn *conn = session->udfUvPipe->data;
|
||||||
conn->session = NULL;
|
conn->session = NULL;
|
||||||
|
|
|
@ -81,28 +81,28 @@ __compar_fn_t idxGetCompar(int8_t type) {
|
||||||
}
|
}
|
||||||
return getComparFunc(type, 0);
|
return getComparFunc(type, 0);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_LESS_THAN, a, b, type);
|
return tCompare(func, QUERY_LESS_THAN, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_LESS_EQUAL, a, b, type);
|
return tCompare(func, QUERY_LESS_EQUAL, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_GREATER_THAN, a, b, type);
|
return tCompare(func, QUERY_GREATER_THAN, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
|
return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TExeCond tCompareContains(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareContains(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_TERM, a, b, type);
|
return tCompare(func, QUERY_TERM, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareEqual(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_TERM, a, b, type);
|
return tCompare(func, QUERY_TERM, a, b, type);
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ typedef struct SIFCtx {
|
||||||
SIndexMetaArg arg;
|
SIndexMetaArg arg;
|
||||||
} SIFCtx;
|
} SIFCtx;
|
||||||
|
|
||||||
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
static FORCE_INLINE int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
||||||
if (src == OP_TYPE_GREATER_THAN) {
|
if (src == OP_TYPE_GREATER_THAN) {
|
||||||
*dst = QUERY_GREATER_THAN;
|
*dst = QUERY_GREATER_THAN;
|
||||||
} else if (src == OP_TYPE_GREATER_EQUAL) {
|
} else if (src == OP_TYPE_GREATER_EQUAL) {
|
||||||
|
@ -110,10 +110,9 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
||||||
|
|
||||||
static sif_func_t sifNullFunc = NULL;
|
static sif_func_t sifNullFunc = NULL;
|
||||||
|
|
||||||
static void sifFreeParam(SIFParam *param) {
|
static FORCE_INLINE void sifFreeParam(SIFParam *param) {
|
||||||
if (param == NULL) return;
|
if (param == NULL) return;
|
||||||
|
|
||||||
taosArrayDestroy(param->result);
|
taosArrayDestroy(param->result);
|
||||||
|
@ -123,7 +122,7 @@ static void sifFreeParam(SIFParam *param) {
|
||||||
param->pFilter = NULL;
|
param->pFilter = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetOperParamNum(EOperatorType ty) {
|
static FORCE_INLINE int32_t sifGetOperParamNum(EOperatorType ty) {
|
||||||
if (OP_TYPE_IS_NULL == ty || OP_TYPE_IS_NOT_NULL == ty || OP_TYPE_IS_TRUE == ty || OP_TYPE_IS_NOT_TRUE == ty ||
|
if (OP_TYPE_IS_NULL == ty || OP_TYPE_IS_NOT_NULL == ty || OP_TYPE_IS_TRUE == ty || OP_TYPE_IS_NOT_TRUE == ty ||
|
||||||
OP_TYPE_IS_FALSE == ty || OP_TYPE_IS_NOT_FALSE == ty || OP_TYPE_IS_UNKNOWN == ty ||
|
OP_TYPE_IS_FALSE == ty || OP_TYPE_IS_NOT_FALSE == ty || OP_TYPE_IS_UNKNOWN == ty ||
|
||||||
OP_TYPE_IS_NOT_UNKNOWN == ty || OP_TYPE_MINUS == ty) {
|
OP_TYPE_IS_NOT_UNKNOWN == ty || OP_TYPE_MINUS == ty) {
|
||||||
|
@ -131,14 +130,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
|
||||||
}
|
}
|
||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
static int32_t sifValidOp(EOperatorType ty) {
|
static FORCE_INLINE int32_t sifValidOp(EOperatorType ty) {
|
||||||
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
|
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
|
||||||
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
|
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int32_t sifValidColumn(SColumnNode *cn) {
|
static FORCE_INLINE int32_t sifValidColumn(SColumnNode *cn) {
|
||||||
// add more check
|
// add more check
|
||||||
if (cn == NULL) {
|
if (cn == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -149,7 +148,7 @@ static int32_t sifValidColumn(SColumnNode *cn) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) {
|
static FORCE_INLINE SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) {
|
||||||
// enh rule later
|
// enh rule later
|
||||||
if (type == LOGIC_COND_TYPE_AND) {
|
if (type == LOGIC_COND_TYPE_AND) {
|
||||||
if (ls == SFLT_NOT_INDEX || rs == SFLT_NOT_INDEX) {
|
if (ls == SFLT_NOT_INDEX || rs == SFLT_NOT_INDEX) {
|
||||||
|
@ -167,7 +166,7 @@ static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SI
|
||||||
return SFLT_NOT_INDEX;
|
return SFLT_NOT_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
// covert data From snode;
|
// covert data From snode;
|
||||||
SValueNode *vn = (SValueNode *)node;
|
SValueNode *vn = (SValueNode *)node;
|
||||||
|
|
||||||
|
@ -205,7 +204,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
static FORCE_INLINE int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
SOperatorNode *nd = (SOperatorNode *)node;
|
SOperatorNode *nd = (SOperatorNode *)node;
|
||||||
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
||||||
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
||||||
|
@ -355,30 +354,30 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef int (*Filter)(void *a, void *b, int16_t dtype);
|
typedef int (*FilterFunc)(void *a, void *b, int16_t dtype);
|
||||||
|
|
||||||
int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
||||||
}
|
}
|
||||||
int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
||||||
}
|
}
|
||||||
int sifLessEqual(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifLessEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
||||||
}
|
}
|
||||||
int sifLessThan(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifLessThan(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
||||||
}
|
}
|
||||||
int sifEqual(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
//__compar_fn_t func = idxGetCompar(dtype);
|
//__compar_fn_t func = idxGetCompar(dtype);
|
||||||
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
||||||
}
|
}
|
||||||
static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
|
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
|
||||||
*reverse = true;
|
*reverse = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -470,8 +469,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
||||||
indexMultiTermQueryAdd(mtm, tm, qtype);
|
indexMultiTermQueryAdd(mtm, tm, qtype);
|
||||||
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
|
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
|
||||||
} else {
|
} else {
|
||||||
bool reverse;
|
bool reverse;
|
||||||
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
|
FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse);
|
||||||
|
|
||||||
SMetaFltParam param = {.suid = arg->suid,
|
SMetaFltParam param = {.suid = arg->suid,
|
||||||
.cid = left->colId,
|
.cid = left->colId,
|
||||||
|
@ -498,72 +497,72 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LOWER_THAN;
|
int id = OP_TYPE_LOWER_THAN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LOWER_EQUAL;
|
int id = OP_TYPE_LOWER_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_GREATER_THAN;
|
int id = OP_TYPE_GREATER_THAN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_GREATER_EQUAL;
|
int id = OP_TYPE_GREATER_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_EQUAL;
|
int id = OP_TYPE_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_EQUAL;
|
int id = OP_TYPE_NOT_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_IN;
|
int id = OP_TYPE_IN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_IN;
|
int id = OP_TYPE_NOT_IN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LIKE;
|
int id = OP_TYPE_LIKE;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_LIKE;
|
int id = OP_TYPE_NOT_LIKE;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_MATCH;
|
int id = OP_TYPE_MATCH;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NMATCH;
|
int id = OP_TYPE_NMATCH;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_JSON_CONTAINS;
|
int id = OP_TYPE_JSON_CONTAINS;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) {
|
static FORCE_INLINE int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) {
|
||||||
// return 0
|
// return 0
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
// add more except
|
// add more except
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) {
|
static FORCE_INLINE int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) {
|
||||||
// impl later
|
// impl later
|
||||||
*status = SFLT_ACCURATE_INDEX;
|
*status = SFLT_ACCURATE_INDEX;
|
||||||
switch (funcId) {
|
switch (funcId) {
|
||||||
|
|
|
@ -502,7 +502,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
|
||||||
i64 offset;
|
i64 offset;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
offset = pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
|
offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
|
||||||
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -126,22 +126,22 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyHttpClient(SHttpClient* cli) {
|
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
||||||
taosMemoryFree(cli->wbuf);
|
taosMemoryFree(cli->wbuf);
|
||||||
taosMemoryFree(cli->rbuf);
|
taosMemoryFree(cli->rbuf);
|
||||||
taosMemoryFree(cli->addr);
|
taosMemoryFree(cli->addr);
|
||||||
taosMemoryFree(cli);
|
taosMemoryFree(cli);
|
||||||
}
|
}
|
||||||
static void clientCloseCb(uv_handle_t* handle) {
|
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
}
|
}
|
||||||
static void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
buf->base = cli->rbuf;
|
buf->base = cli->rbuf;
|
||||||
buf->len = HTTP_RECV_BUF_SIZE;
|
buf->len = HTTP_RECV_BUF_SIZE;
|
||||||
}
|
}
|
||||||
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
uError("http-report recv error:%s", uv_err_name(nread));
|
uError("http-report recv error:%s", uv_err_name(nread));
|
||||||
|
@ -173,7 +173,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||||
uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(server);
|
uint32_t ip = taosGetIpv4FromFqdn(server);
|
||||||
if (ip == 0xffffffff) {
|
if (ip == 0xffffffff) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
|
@ -69,11 +69,9 @@ typedef struct SCliThrd {
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
uv_prepare_t* prepare;
|
uv_prepare_t* prepare;
|
||||||
void* pool; // conn pool
|
void* pool; // conn pool
|
||||||
|
// timer handles
|
||||||
SArray* timerList;
|
SArray* timerList;
|
||||||
|
|
||||||
// msg queue
|
// msg queue
|
||||||
|
|
||||||
queue msg;
|
queue msg;
|
||||||
TdThreadMutex msgMtx;
|
TdThreadMutex msgMtx;
|
||||||
SDelayQueue* delayQueue;
|
SDelayQueue* delayQueue;
|
||||||
|
@ -108,7 +106,7 @@ static void cliReadTimeoutCb(uv_timer_t* handle);
|
||||||
// register timer in each thread to clear expire conn
|
// register timer in each thread to clear expire conn
|
||||||
// static void cliTimeoutCb(uv_timer_t* handle);
|
// static void cliTimeoutCb(uv_timer_t* handle);
|
||||||
// alloc buffer for recv
|
// alloc buffer for recv
|
||||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
// callback after recv nbytes from socket
|
// callback after recv nbytes from socket
|
||||||
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
// callback after send data to socket
|
// callback after send data to socket
|
||||||
|
@ -132,10 +130,10 @@ static void cliSend(SCliConn* pConn);
|
||||||
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
||||||
|
|
||||||
// cli util func
|
// cli util func
|
||||||
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
||||||
static void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
||||||
|
|
||||||
static int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
|
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
|
||||||
|
|
||||||
// process data read from server, add decompress etc later
|
// process data read from server, add decompress etc later
|
||||||
static void cliHandleResp(SCliConn* conn);
|
static void cliHandleResp(SCliConn* conn);
|
||||||
|
@ -150,12 +148,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
||||||
cliHandleUpdate};
|
cliHandleUpdate};
|
||||||
|
|
||||||
static void cliSendQuit(SCliThrd* thrd);
|
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
|
||||||
static void destroyUserdata(STransMsg* userdata);
|
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||||
|
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
|
||||||
|
|
||||||
static int cliRBChoseIdx(STrans* pTransInst);
|
|
||||||
|
|
||||||
static void destroyCmsg(void* cmsg);
|
|
||||||
static void transDestroyConnCtx(STransConnCtx* ctx);
|
static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
// thread obj
|
// thread obj
|
||||||
static SCliThrd* createThrdObj();
|
static SCliThrd* createThrdObj();
|
||||||
|
@ -434,6 +430,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
|
||||||
|
|
||||||
if (pCtx == NULL || pCtx->pSem == NULL) {
|
if (pCtx == NULL || pCtx->pSem == NULL) {
|
||||||
if (transMsg.info.ahandle == NULL) {
|
if (transMsg.info.ahandle == NULL) {
|
||||||
|
if (REQUEST_NO_RESP(&pMsg->msg)) destroyCmsg(pMsg);
|
||||||
once = true;
|
once = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -885,26 +882,23 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
||||||
}
|
}
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
||||||
if (pCvtAddr->cvt == false) {
|
if (pCvtAddr->cvt == false) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) {
|
if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||||
if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
|
memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
|
||||||
memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN);
|
memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
|
||||||
memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
||||||
if (code != 0) return false;
|
if (code != 0) return false;
|
||||||
if (pCtx->retryCnt == 0) return false;
|
if (pCtx->retryCnt == 0) return false;
|
||||||
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
||||||
int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
memset(pResp, 0, sizeof(STransMsg));
|
memset(pResp, 0, sizeof(STransMsg));
|
||||||
|
@ -980,6 +974,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
|
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
||||||
}
|
}
|
||||||
static void cliAsyncCb(uv_async_t* handle) {
|
static void cliAsyncCb(uv_async_t* handle) {
|
||||||
SAsyncItem* item = handle->data;
|
SAsyncItem* item = handle->data;
|
||||||
|
@ -1128,14 +1124,15 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
return cli;
|
return cli;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyUserdata(STransMsg* userdata) {
|
FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
|
||||||
if (userdata->pCont == NULL) {
|
if (userdata->pCont == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transFreeMsg(userdata->pCont);
|
transFreeMsg(userdata->pCont);
|
||||||
userdata->pCont = NULL;
|
userdata->pCont = NULL;
|
||||||
}
|
}
|
||||||
static void destroyCmsg(void* arg) {
|
|
||||||
|
FORCE_INLINE void destroyCmsg(void* arg) {
|
||||||
SCliMsg* pMsg = arg;
|
SCliMsg* pMsg = arg;
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -1220,7 +1217,7 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int cliRBChoseIdx(STrans* pTransInst) {
|
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
int8_t index = pTransInst->index;
|
int8_t index = pTransInst->index;
|
||||||
if (pTransInst->numOfThreads == 0) {
|
if (pTransInst->numOfThreads == 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1230,7 +1227,7 @@ int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
}
|
}
|
||||||
return index % pTransInst->numOfThreads;
|
return index % pTransInst->numOfThreads;
|
||||||
}
|
}
|
||||||
static void doDelayTask(void* param) {
|
static FORCE_INLINE void doDelayTask(void* param) {
|
||||||
STaskArg* arg = param;
|
STaskArg* arg = param;
|
||||||
SCliMsg* pMsg = arg->param1;
|
SCliMsg* pMsg = arg->param1;
|
||||||
SCliThrd* pThrd = arg->param2;
|
SCliThrd* pThrd = arg->param2;
|
||||||
|
@ -1264,13 +1261,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||||
if (*val != exp) {
|
if (*val != exp) {
|
||||||
*val = newVal;
|
*val = newVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
||||||
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
|
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1300,7 +1297,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
if (pMsg == NULL || pMsg->ctx == NULL) {
|
if (pMsg == NULL || pMsg->ctx == NULL) {
|
||||||
tTrace("%s conn %p handle resp", pTransInst->label, pConn);
|
tDebug("%s conn %p handle resp", pTransInst->label, pConn);
|
||||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1402,7 +1399,7 @@ void transUnrefCliHandle(void* handle) {
|
||||||
cliDestroyConn((SCliConn*)handle, true);
|
cliDestroyConn((SCliConn*)handle, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
|
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
|
||||||
SCliThrd* pThrd = NULL;
|
SCliThrd* pThrd = NULL;
|
||||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
||||||
if (exh == NULL) {
|
if (exh == NULL) {
|
||||||
|
|
|
@ -424,7 +424,7 @@ void transQueueDestroy(STransQueue* queue) {
|
||||||
taosArrayDestroy(queue->q);
|
taosArrayDestroy(queue->q);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
||||||
SDelayTask* arg1 = container_of(a, SDelayTask, node);
|
SDelayTask* arg1 = container_of(a, SDelayTask, node);
|
||||||
SDelayTask* arg2 = container_of(b, SDelayTask, node);
|
SDelayTask* arg2 = container_of(b, SDelayTask, node);
|
||||||
if (arg1->execTime > arg2->execTime) {
|
if (arg1->execTime > arg2->execTime) {
|
||||||
|
|
|
@ -125,17 +125,17 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
|
||||||
static void uvWalkCb(uv_handle_t* handle, void* arg);
|
static void uvWalkCb(uv_handle_t* handle, void* arg);
|
||||||
static void uvFreeCb(uv_handle_t* handle);
|
static void uvFreeCb(uv_handle_t* handle);
|
||||||
|
|
||||||
static void uvStartSendRespImpl(SSvrMsg* smsg);
|
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg);
|
||||||
|
|
||||||
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSvrMsg* msg);
|
static void uvStartSendResp(SSvrMsg* msg);
|
||||||
|
|
||||||
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
||||||
|
|
||||||
static void destroySmsg(SSvrMsg* smsg);
|
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||||
// check whether already read complete packet
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||||
static SSvrConn* createConn(void* hThrd);
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||||
static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
||||||
static void destroyConnRegArg(SSvrConn* conn);
|
|
||||||
|
|
||||||
static int reallocConnRef(SSvrConn* conn);
|
static int reallocConnRef(SSvrConn* conn);
|
||||||
|
|
||||||
|
@ -413,7 +413,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
wb->len = len;
|
wb->len = len;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvStartSendRespImpl(SSvrMsg* smsg) {
|
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
|
||||||
SSvrConn* pConn = smsg->pConn;
|
SSvrConn* pConn = smsg->pConn;
|
||||||
if (pConn->broken) {
|
if (pConn->broken) {
|
||||||
return;
|
return;
|
||||||
|
@ -447,7 +447,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroySmsg(SSvrMsg* smsg) {
|
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
|
||||||
if (smsg == NULL) {
|
if (smsg == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -812,7 +812,7 @@ void* transWorkerThread(void* arg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSvrConn* createConn(void* hThrd) {
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
SWorkThrd* pThrd = hThrd;
|
SWorkThrd* pThrd = hThrd;
|
||||||
|
|
||||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||||
|
@ -842,7 +842,7 @@ static SSvrConn* createConn(void* hThrd) {
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyConn(SSvrConn* conn, bool clear) {
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -854,7 +854,7 @@ static void destroyConn(SSvrConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void destroyConnRegArg(SSvrConn* conn) {
|
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
|
||||||
if (conn->regArg.init == 1) {
|
if (conn->regArg.init == 1) {
|
||||||
transFreeMsg(conn->regArg.msg.pCont);
|
transFreeMsg(conn->regArg.msg.pCont);
|
||||||
conn->regArg.init = 0;
|
conn->regArg.init = 0;
|
||||||
|
|
|
@ -97,6 +97,7 @@ while $loop <= $loops
|
||||||
endw
|
endw
|
||||||
sql select count(*) from $stb
|
sql select count(*) from $stb
|
||||||
if $data00 != $totalNum then
|
if $data00 != $totalNum then
|
||||||
|
print expect $totalNum , actual: $data00
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
$loop = $loop + 1
|
$loop = $loop + 1
|
||||||
|
|
Loading…
Reference in New Issue