other: merge main

This commit is contained in:
Haojun Liao 2023-02-06 16:02:54 +08:00
commit b53d2669da
26 changed files with 487 additions and 143 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG c4a567b GIT_TAG 6a2d9fc
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

6
docs/examples/go/go.mod Normal file
View File

@ -0,0 +1,6 @@
module goexample
go 1.17
require github.com/taosdata/driver-go/v3 v3.1.0

View File

@ -218,6 +218,7 @@ bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId); bool fmIsCumulativeFunc(int32_t funcId);
bool fmIsInterpPseudoColumnFunc(int32_t funcId); bool fmIsInterpPseudoColumnFunc(int32_t funcId);
bool fmIsGroupKeyFunc(int32_t funcId); bool fmIsGroupKeyFunc(int32_t funcId);
bool fmIsBlockDistFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType); void getLastCacheDataType(SDataType* pType);

View File

@ -1402,8 +1402,6 @@ int32_t doProcessMsgFromServer(void* param) {
tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} }
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
} }
} }
@ -1423,6 +1421,11 @@ int32_t doProcessMsgFromServer(void* param) {
} }
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
if (pTscObj) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
}
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);

View File

@ -167,6 +167,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -41,6 +41,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq);
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
@ -64,6 +65,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
@ -2176,6 +2178,10 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
return 0; return 0;
} }
static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) {
return 0;
}
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;

View File

@ -85,7 +85,11 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
pRaw, pMgmt->transSec, pMgmt->transSeq); pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMeta->code == 0) { if (pMeta->code == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw); int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
return 0;
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
} }

View File

@ -572,8 +572,20 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
} }
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, mTrace("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
if (pOld->createdTime != pNew->createdTime) {
mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64
", new row:%p stage:%s create:%" PRId64,
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
// only occured while sync timeout
terrno = TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT;
return -1;
}
mndTransUpdateActions(pOld->redoActions, pNew->redoActions); mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions);

View File

@ -321,6 +321,8 @@ struct STsdb {
STsdbFS fs; STsdbFS fs;
SLRUCache *lruCache; SLRUCache *lruCache;
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
}; };
struct TSDBKEY { struct TSDBKEY {
@ -746,6 +748,9 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);

View File

@ -15,6 +15,34 @@
#include "tsdb.h" #include "tsdb.h"
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = taosLRUCacheInit(5 * 1024 * 1024, -1, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->biMutex, NULL);
_err:
pTsdb->biCache = pCache;
return code;
}
static void tsdbCloseBICache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->biCache;
if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->biMutex);
}
}
int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SLRUCache *pCache = NULL; SLRUCache *pCache = NULL;
@ -26,6 +54,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err; goto _err;
} }
code = tsdbOpenBICache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false); taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->lruMutex, NULL); taosThreadMutexInit(&pTsdb->lruMutex, NULL);
@ -44,6 +78,8 @@ void tsdbCloseCache(STsdb *pTsdb) {
taosThreadMutexDestroy(&pTsdb->lruMutex); taosThreadMutexDestroy(&pTsdb->lruMutex);
} }
tsdbCloseBICache(pTsdb);
} }
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
@ -1475,3 +1511,84 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) {
return usage; return usage;
} }
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
struct {
int32_t fid;
int64_t commitID;
} biKey = {0};
biKey.fid = fid;
biKey.commitID = commitID;
*len = sizeof(biKey);
memcpy(key, &biKey, *len);
}
static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockIdx) {
SArray *pArray = taosArrayInit(8, sizeof(SBlockIdx));
int32_t code = tsdbReadBlockIdx(pFileReader, pArray);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pArray);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
*aBlockIdx = pArray;
return code;
}
static void deleteBICache(const void *key, size_t keyLen, void *value) {
SArray *pArray = (SArray *)value;
taosArrayDestroy(pArray);
}
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
getBICacheKey(pFileReader->pSet->fid, pFileReader->pSet->pHeadF->commitID, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pFileReader->pTsdb;
taosThreadMutexLock(&pTsdb->biMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
SArray *pArray = NULL;
code = tsdbCacheLoadBlockIdx(pFileReader, &pArray);
// if table's empty or error, return code of -1
if (code != TSDB_CODE_SUCCESS || pArray == NULL) {
taosThreadMutexUnlock(&pTsdb->biMutex);
*handle = NULL;
return 0;
}
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteBICache;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->biMutex);
}
*handle = h;
return code;
}
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}

View File

@ -79,16 +79,19 @@ typedef struct SIOCostSummary {
int64_t composedBlocks; int64_t composedBlocks;
double buildComposedBlockTime; double buildComposedBlockTime;
double createScanInfoList; double createScanInfoList;
double getTbFromMemTime;
double getTbFromIMemTime;
double initDelSkylineIterTime;
} SIOCostSummary; } SIOCostSummary;
typedef struct SBlockLoadSuppInfo { typedef struct SBlockLoadSuppInfo {
SArray* pColAgg; SArray* pColAgg;
SColumnDataAgg tsColAgg; SColumnDataAgg tsColAgg;
int16_t* colId; int16_t* colId;
int16_t* slotId; int16_t* slotId;
int32_t numOfCols; int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
bool smaValid; // the sma on all queried columns are activated bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SLastBlockReader { typedef struct SLastBlockReader {
@ -168,11 +171,11 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo; SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap; STsdbReadSnap* pReadSnap;
SIOCostSummary cost; SIOCostSummary cost;
STSchema* pSchema; // the newest version schema STSchema* pSchema; // the newest version schema
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; // the file reader SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index; SArray* pDelIdx; // del file block index;
SVersionRange verRange; SVersionRange verRange;
SBlockInfoBuf blockInfoBuf; SBlockInfoBuf blockInfoBuf;
int32_t step; int32_t step;
@ -219,17 +222,18 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
pSupInfo->smaValid = true; pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols; pSupInfo->numOfCols = numOfCols;
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES)); pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
if (pSupInfo->colId == NULL) { if (pSupInfo->colId == NULL) {
taosMemoryFree(pSupInfo->colId); taosMemoryFree(pSupInfo->colId);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols)); pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols)); pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSupInfo->colId[i] = pCols[i].colId; pSupInfo->colId[i] = pCols[i].colId;
pSupInfo->slotId[i] = pSlotIdList[i]; pSupInfo->slotId[i] = pSlotIdList[i];
@ -247,7 +251,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) { while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
STColumn* pTCol = &pSchema->columns[i]; STColumn* pTCol = &pSchema->columns[i];
if (pTCol->colId == pSupInfo->colId[j]) { if (pTCol->colId == pSupInfo->colId[j]) {
if (!IS_BSMA_ON(pTCol)) { if (!IS_BSMA_ON(pTCol)) {
@ -312,7 +316,8 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
} }
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) { static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
int32_t numOfTables) {
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption // todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap = SHashObj* pTableMap =
@ -398,9 +403,7 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
taosHashCleanup(pTableMap); taosHashCleanup(pTableMap);
} }
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
return pWindow->skey > pWindow->ekey;
}
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already. // the expired data to client, even it is queried already.
@ -644,17 +647,21 @@ _end:
} }
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx)); // SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx); // int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
if (code != TSDB_CODE_SUCCESS) { LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end; goto _end;
} }
size_t num = taosArrayGetSize(aBlockIdx); SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) { if (num == 0) {
taosArrayDestroy(aBlockIdx); tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
// taosArrayDestroy(aBlockIdx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -690,7 +697,8 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0; pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
_end: _end:
taosArrayDestroy(aBlockIdx); // taosArrayDestroy(aBlockIdx);
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code; return code;
} }
@ -769,7 +777,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el, numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr); pReader->idStr);
pReader->cost.numOfBlocks += total; pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el; pReader->cost.headFileLoadTime += el;
@ -903,7 +910,7 @@ static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* p
// a faster version of copy procedure. // a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
int32_t dumpedRows, bool asc) { int32_t dumpedRows, bool asc) {
uint8_t* p = NULL; uint8_t* p = NULL;
if (asc) { if (asc) {
p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
@ -912,22 +919,21 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
p = pData->pData + tDataTypes[pData->type].bytes * startIndex; p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
} }
int32_t step = asc? 1:-1; int32_t step = asc ? 1 : -1;
// make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model // 1. copy data in a batch model
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
// 2. reverse the array list in case of descending order scan data block // 2. reverse the array list in case of descending order scan data block
if (!asc) { if (!asc) {
switch(pColData->info.type) { switch (pColData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT: {
{
int32_t mid = dumpedRows >> 1u; int32_t mid = dumpedRows >> 1u;
int64_t* pts = (int64_t*)pColData->pData; int64_t* pts = (int64_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) { for (int32_t j = 0; j < mid; ++j) {
@ -941,7 +947,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: { case TSDB_DATA_TYPE_UTINYINT: {
int32_t mid = dumpedRows >> 1u; int32_t mid = dumpedRows >> 1u;
int8_t* pts = (int8_t*)pColData->pData; int8_t* pts = (int8_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) { for (int32_t j = 0; j < mid; ++j) {
int8_t t = pts[j]; int8_t t = pts[j];
@ -1113,7 +1119,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s", ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows, pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
@ -1766,7 +1772,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMergerAdd(&merge, pRow, pSchema); tRowMergerAdd(&merge, pRow, pSchema);
} else { } else {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, pRow, pSchema); int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2177,10 +2183,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
} }
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
int64_t st = 0;
STbData* d = NULL; STbData* d = NULL;
if (pReader->pReadSnap->pMem != NULL) { if (pReader->pReadSnap->pMem != NULL) {
st = taosGetTimestampUs();
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
pReader->cost.getTbFromMemTime += (taosGetTimestampUs() - st) / 1000.0;
if (d != NULL) { if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -2201,7 +2210,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* di = NULL; STbData* di = NULL;
if (pReader->pReadSnap->pIMem != NULL) { if (pReader->pReadSnap->pIMem != NULL) {
st = taosGetTimestampUs();
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
pReader->cost.getTbFromIMemTime += (taosGetTimestampUs() - st) / 1000.0;
if (di != NULL) { if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -2220,7 +2231,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
} }
st = taosGetTimestampUs();
initDelSkylineIterator(pBlockScanInfo, pReader, d, di); initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
pBlockScanInfo->iterInit = true; pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2296,7 +2309,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
return false; // this is an invalid result. return false; // this is an invalid result.
} }
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
} }
@ -2447,7 +2460,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly // it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) { pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo); copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
@ -2611,7 +2624,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
hasIKey = true; hasIKey = true;
ikey = TSDBROW_KEY(pIRow); ikey = TSDBROW_KEY(pIRow);
} }
if (hasKey) { if (hasKey) {
if (hasIKey) { // has data in mem & imem if (hasIKey) { // has data in mem & imem
if (asc) { if (asc) {
@ -2666,7 +2679,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
taosArrayDestroy(pIndexList); taosArrayDestroy(pIndexList);
if (pReader->pReadSnap != NULL) { if (pReader->pReadSnap != NULL) {
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile; SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
if (pReader->pDelFReader == NULL && pDelFile != NULL) { if (pReader->pDelFReader == NULL && pDelFile != NULL) {
int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb); int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
@ -2854,7 +2866,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
ASSERT(pBlockInfo != NULL); ASSERT(pBlockInfo != NULL);
// if (pBlockInfo != NULL) { // if (pBlockInfo != NULL) {
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
// } else { // } else {
// pScanInfo = *pReader->status.pTableIter; // pScanInfo = *pReader->status.pTableIter;
@ -2866,9 +2878,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code; return code;
} }
// if (pBlockInfo != NULL) { // if (pBlockInfo != NULL) {
pBlock = getCurrentBlock(pBlockIter); pBlock = getCurrentBlock(pBlockIter);
// } // }
initLastBlockReader(pLastBlockReader, pScanInfo, pReader); initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
@ -2933,7 +2945,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table // update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts, pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
pBlock->maxKey.ts, pReader->idStr); pBlock->maxKey.ts, pReader->idStr);
@ -3188,7 +3201,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false; return false;
} else if (pKey->ts == last->ts) { } else if (pKey->ts == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2); TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer); return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
} }
} else { } else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index); TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
@ -3372,7 +3386,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*state = CHECK_FILEBLOCK_QUIT; *state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
bool loadNeighbor = true; bool loadNeighbor = true;
int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor); int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
@ -3624,7 +3638,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
SColVal colVal = {0}; SColVal colVal = {0};
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts; ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
i += 1; i += 1;
@ -3669,7 +3683,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t outputRowIndex = pResBlock->info.rows; int32_t outputRowIndex = pResBlock->info.rows;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) { if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]); SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex]; ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1; i += 1;
@ -4011,14 +4025,17 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pLReader); taosMemoryFree(pLReader);
} }
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 tsdbDebug(
" SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, " "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms, %s", ", fileBlocks-load-time:%.2f ms, "
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms,"
pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime, "getTbFromMem-time:%.2f ms, getTbFromIMem-time:%.2f ms, initDelSkylineIterTime:%.2f ms, %s",
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr); pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pCost->getTbFromMemTime, pCost->getTbFromIMemTime, pCost->initDelSkylineIterTime, pReader->idStr);
taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema); taosMemoryFree(pReader->pSchema);
@ -4034,7 +4051,7 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0){ if (taosHashGetSize(pStatus->pTableMap) == 0) {
return false; return false;
} }
@ -4124,12 +4141,10 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
} }
} }
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols,
SColumnDataAgg* pTsAgg) {
// do fill all null column value SMA info // do fill all null column value SMA info
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg); int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
@ -4142,7 +4157,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i ,&nullColAgg); taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
} }
j += 1; j += 1;
} }

View File

@ -1205,7 +1205,7 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); tMapDataPutItem(&pWriter->mDataBlk, &dataBlk, tPutDataBlk);
pWriter->pDIter->dIter.iDataBlk++; pWriter->pDIter->dIter.iDataBlk++;
} else { } else {
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);

View File

@ -929,8 +929,9 @@ int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) {
return code; return code;
} }
/*
// delete skyline ====================================================== // delete skyline ======================================================
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { static int32_t tsdbMergeSkyline2(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
int32_t code = 0; int32_t code = 0;
int32_t i1 = 0; int32_t i1 = 0;
int32_t n1 = taosArrayGetSize(aSkyline1); int32_t n1 = taosArrayGetSize(aSkyline1);
@ -996,7 +997,141 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS
_exit: _exit:
return code; return code;
} }
*/
// delete skyline ======================================================
static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
int32_t code = 0;
int32_t i1 = 0;
int32_t n1 = taosArrayGetSize(pSkyline1);
int32_t i2 = 0;
int32_t n2 = taosArrayGetSize(pSkyline2);
TSDBKEY *pKey1;
TSDBKEY *pKey2;
int64_t version1 = 0;
int64_t version2 = 0;
ASSERT(n1 > 0 && n2 > 0);
taosArrayClear(pSkyline);
TSDBKEY **pItem = TARRAY_GET_ELEM(pSkyline, 0);
while (i1 < n1 && i2 < n2) {
pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
if (pKey1->ts < pKey2->ts) {
version1 = pKey1->version;
*pItem = pKey1;
i1++;
} else if (pKey1->ts > pKey2->ts) {
version2 = pKey2->version;
*pItem = pKey2;
i2++;
} else {
version1 = pKey1->version;
version2 = pKey2->version;
*pItem = pKey1;
i1++;
i2++;
}
(*pItem)->version = TMAX(version1, version2);
pItem++;
}
while (i1 < n1) {
pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
*pItem = pKey1;
pItem++;
i1++;
}
while (i2 < n2) {
pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
*pItem = pKey2;
pItem++;
i2++;
}
taosArraySetSize(pSkyline, TARRAY_ELEM_IDX(pSkyline, pItem));
_exit:
return code;
}
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
int32_t code = 0;
SDelData *pDelData;
int32_t midx;
taosArrayClear(pSkyline);
if (sidx == eidx) {
TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
taosArrayPush(pSkyline, &pItem1);
taosArrayPush(pSkyline, &pItem2);
} else {
SArray *pSkyline1 = NULL;
SArray *pSkyline2 = NULL;
midx = (sidx + eidx) / 2;
pSkyline1 = taosArrayInit((midx - sidx + 1) * 2, POINTER_BYTES);
pSkyline2 = taosArrayInit((eidx - midx) * 2, POINTER_BYTES);
if (pSkyline1 == NULL || pSkyline1 == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _clear;
}
code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1);
if (code) goto _clear;
code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
if (code) goto _clear;
code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
_clear:
taosArrayDestroy(pSkyline1);
taosArrayDestroy(pSkyline2);
}
return code;
}
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) { int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
SDelData *pDelData;
int32_t code = 0;
int32_t dataNum = eidx - sidx + 1;
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
for (int32_t i = sidx; i <= eidx; ++i) {
pDelData = (SDelData *)taosArrayGet(aDelData, i);
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
}
code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
if (code) goto _clear;
int32_t skylineNum = taosArrayGetSize(pSkyline);
for (int32_t i = 0; i < skylineNum; ++i) {
TSDBKEY *p = taosArrayGetP(pSkyline, i);
taosArrayPush(aSkyline, p);
}
_clear:
taosArrayDestroy(aTmpSkyline);
taosArrayDestroy(pSkyline);
return code;
}
/*
int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
int32_t code = 0; int32_t code = 0;
SDelData *pDelData; SDelData *pDelData;
int32_t midx; int32_t midx;
@ -1033,6 +1168,7 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
return code; return code;
} }
*/
// SBlockData ====================================================== // SBlockData ======================================================
int32_t tBlockDataCreate(SBlockData *pBlockData) { int32_t tBlockDataCreate(SBlockData *pBlockData) {

View File

@ -447,32 +447,6 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t
taosResetFillInfo(pFillInfo, skey); taosResetFillInfo(pFillInfo, skey);
switch (fillType) {
case FILL_MODE_NONE:
pFillInfo->type = TSDB_FILL_NONE;
break;
case FILL_MODE_PREV:
pFillInfo->type = TSDB_FILL_PREV;
break;
case FILL_MODE_NULL:
pFillInfo->type = TSDB_FILL_NULL;
break;
case FILL_MODE_LINEAR:
pFillInfo->type = TSDB_FILL_LINEAR;
break;
case FILL_MODE_NEXT:
pFillInfo->type = TSDB_FILL_NEXT;
break;
case FILL_MODE_VALUE:
pFillInfo->type = TSDB_FILL_SET_VALUE;
break;
default: {
taosMemoryFree(pFillInfo);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
}
pFillInfo->type = fillType; pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol; pFillInfo->pFillCol = pCol;
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols; pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols;

View File

@ -262,6 +262,13 @@ bool fmIsGroupKeyFunc(int32_t funcId) {
return FUNCTION_TYPE_GROUP_KEY == funcMgtBuiltins[funcId].type; return FUNCTION_TYPE_GROUP_KEY == funcMgtBuiltins[funcId].type;
} }
bool fmIsBlockDistFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
}
void fmFuncMgtDestroy() { void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable; void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {

View File

@ -22,11 +22,11 @@ struct SToken;
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) #define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
#define NEXT_TOKEN(pSql, sToken) \ #define NEXT_TOKEN(pSql, sToken) \
do { \ do { \
int32_t index = 0; \ int32_t index = 0; \
sToken = tStrGetToken(pSql, &index, false); \ sToken = tStrGetToken(pSql, &index, false, NULL); \
pSql += index; \ pSql += index; \
} while (0) } while (0)
#define CHECK_CODE(expr) \ #define CHECK_CODE(expr) \

View File

@ -55,7 +55,7 @@ uint32_t tGetToken(const char *z, uint32_t *tokenType);
* @param isPrevOptr * @param isPrevOptr
* @return * @return
*/ */
SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr); SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr, bool *pIgnoreComma);
/** /**
* check if it is a keyword or not * check if it is a keyword or not

View File

@ -18,16 +18,23 @@
#include "tglobal.h" #include "tglobal.h"
#include "ttime.h" #include "ttime.h"
#define NEXT_TOKEN_WITH_PREV(pSql, token) \ #define NEXT_TOKEN_WITH_PREV(pSql, token) \
do { \ do { \
int32_t index = 0; \ int32_t index = 0; \
token = tStrGetToken(pSql, &index, true); \ token = tStrGetToken(pSql, &index, true, NULL); \
pSql += index; \ pSql += index; \
} while (0) } while (0)
#define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \ #define NEXT_TOKEN_WITH_PREV_EXT(pSql, token, pIgnoreComma) \
do { \ do { \
token = tStrGetToken(pSql, &index, false); \ int32_t index = 0; \
token = tStrGetToken(pSql, &index, true, pIgnoreComma); \
pSql += index; \
} while (0)
#define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \
do { \
token = tStrGetToken(pSql, &index, false, NULL); \
} while (0) } while (0)
#define NEXT_VALID_TOKEN(pSql, token) \ #define NEXT_VALID_TOKEN(pSql, token) \
@ -302,12 +309,12 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
* e.g., now+12a, now-5h * e.g., now+12a, now-5h
*/ */
index = 0; index = 0;
SToken token = tStrGetToken(pTokenEnd, &index, false); SToken token = tStrGetToken(pTokenEnd, &index, false, NULL);
pTokenEnd += index; pTokenEnd += index;
if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) { if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) {
index = 0; index = 0;
SToken valueToken = tStrGetToken(pTokenEnd, &index, false); SToken valueToken = tStrGetToken(pTokenEnd, &index, false, NULL);
pTokenEnd += index; pTokenEnd += index;
if (valueToken.n < 2) { if (valueToken.n < 2) {
@ -1240,30 +1247,35 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB
int32_t code = tdSRowResetBuf(pBuilder, row); int32_t code = tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) { for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
NEXT_TOKEN_WITH_PREV(*pSql, *pToken); const char* pOrigSql = *pSql;
SSchema* pSchema = &pSchemas[pCols->boundColumns[i]]; bool ignoreComma = false;
NEXT_TOKEN_WITH_PREV_EXT(*pSql, *pToken, &ignoreComma);
if (ignoreComma) {
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
break;
}
if (pToken->type == TK_NK_QUESTION) { if (pToken->type == TK_NK_QUESTION) {
isParseBindParam = true; isParseBindParam = true;
if (NULL == pCxt->pComCxt->pStmtCb) { if (NULL == pCxt->pComCxt->pStmtCb) {
code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
break;
}
} else {
if (TK_NK_RP == pToken->type) {
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
break;
} }
continue;
}
if (TSDB_CODE_SUCCESS == code && TK_NK_RP == pToken->type) { if (isParseBindParam) {
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
} break;
}
if (TSDB_CODE_SUCCESS == code && isParseBindParam) { param.schema = &pSchemas[pCols->boundColumns[i]];
code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
}
if (TSDB_CODE_SUCCESS == code) {
param.schema = pSchema;
insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, &param.toffset, &param.colIdx); insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, &param.toffset, &param.colIdx);
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pDataBuf->pTableMeta).precision, insMemRowAppend, code = parseValueToken(pCxt, pSql, pToken, param.schema, getTableInfo(pDataBuf->pTableMeta).precision,
&param); insMemRowAppend, &param);
} }
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) { if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {

View File

@ -620,7 +620,7 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
return 0; return 0;
} }
SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) { SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreComma) {
SToken t0 = {0}; SToken t0 = {0};
// here we reach the end of sql string, null-terminated string // here we reach the end of sql string, null-terminated string
@ -641,6 +641,10 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
return t0; return t0;
} }
if (NULL != pIgnoreComma && t == ',') {
*pIgnoreComma = true;
}
t = str[++(*i)]; t = str[++(*i)];
} }

View File

@ -1561,6 +1561,26 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pFunc) {
if (!fmIsBlockDistFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCtx->pCurrStmt)) {
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
SSelectStmt* pSelect = (SSelectStmt*)pCtx->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
if (NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_SUPER_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) {
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s is only supported on super table, child table or normal table", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static bool isStar(SNode* pNode) { static bool isStar(SNode* pNode) {
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) && return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) &&
(0 == strcmp(((SColumnNode*)pNode)->colName, "*")); (0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
@ -1720,7 +1740,7 @@ static int32_t rewriteSystemInfoFunc(STranslateContext* pCxt, SNode** pNode) {
return TSDB_CODE_PAR_INTERNAL_ERROR; return TSDB_CODE_PAR_INTERNAL_ERROR;
} }
static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { static int32_t translateNormalFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
int32_t code = translateAggFunc(pCxt, pFunc); int32_t code = translateAggFunc(pCxt, pFunc);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateScanPseudoColumnFunc(pCxt, pFunc); code = translateScanPseudoColumnFunc(pCxt, pFunc);
@ -1752,6 +1772,9 @@ static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* p
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateTimelineFunc(pCxt, pFunc); code = translateTimelineFunc(pCxt, pFunc);
} }
if (TSDB_CODE_SUCCESS == code) {
code = translateBlockDistFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setFuncClassification(pCxt->pCurrStmt, pFunc); setFuncClassification(pCxt->pCurrStmt, pFunc);
} }
@ -1812,7 +1835,7 @@ static int32_t translateFunctionImpl(STranslateContext* pCxt, SFunctionNode** pF
if (fmIsClientPseudoColumnFunc((*pFunc)->funcId)) { if (fmIsClientPseudoColumnFunc((*pFunc)->funcId)) {
return rewriteClientPseudoColumnFunc(pCxt, (SNode**)pFunc); return rewriteClientPseudoColumnFunc(pCxt, (SNode**)pFunc);
} }
return translateNoramlFunction(pCxt, *pFunc); return translateNormalFunction(pCxt, *pFunc);
} }
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc) { static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc) {

View File

@ -27,7 +27,7 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
const char* pSql = pStr; const char* pSql = pStr;
int32_t index = 0; int32_t index = 0;
SToken t = tStrGetToken((char*)pStr, &index, false); SToken t = tStrGetToken((char*)pStr, &index, false, NULL);
if (TK_INSERT != t.type && TK_IMPORT != t.type) { if (TK_INSERT != t.type && TK_IMPORT != t.type) {
return false; return false;
} }
@ -35,7 +35,7 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
do { do {
pStr += index; pStr += index;
index = 0; index = 0;
t = tStrGetToken((char*)pStr, &index, false); t = tStrGetToken((char*)pStr, &index, false, NULL);
if (TK_USING == t.type || TK_VALUES == t.type || TK_FILE == t.type) { if (TK_USING == t.type || TK_VALUES == t.type || TK_FILE == t.type) {
return true; return true;
} else if (TK_SELECT == t.type) { } else if (TK_SELECT == t.type) {

View File

@ -510,16 +510,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
SSyncLogStoreData *pData = ths->pLogStore->data; SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal; SWal *pWal = pData->pWal;
bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
int64_t walCommitVer = walGetCommittedVer(pWal); int64_t walCommitVer = walGetCommittedVer(pWal);
snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
if (!isEmpty && ths->commitIndex != walCommitVer) {
sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
ths->commitIndex);
snapStart = walCommitVer + 1;
} else {
snapStart = ths->commitIndex + 1;
}
sNInfo(ths, "snapshot begin index is %" PRId64, snapStart); sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
} }

View File

@ -457,6 +457,11 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
SPgno journalSize = 0; SPgno journalSize = 0;
int ret; int ret;
if (pTxn->jfd == 0) {
// txn is commited
return 0;
}
// sync the journal file // sync the journal file
ret = tdbOsFSync(pTxn->jfd); ret = tdbOsFSync(pTxn->jfd);
if (ret < 0) { if (ret < 0) {

View File

@ -1045,6 +1045,12 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
if (v == NULL) { if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn); addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
return addr;
}
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
} else { } else {
addr = *v; addr = *v;
@ -1116,9 +1122,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
if (ipaddr == 0xffffffff) {
uv_timer_stop(conn->timer);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExcept(conn);
return;
}
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons((uint16_t)conn->port); addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
STraceId* trace = &(pMsg->msg.info.traceId); STraceId* trace = &(pMsg->msg.info.traceId);

View File

@ -944,7 +944,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) { if (iResult != 0) {
// printf("WSAStartup failed: %d\n", iResult); // printf("WSAStartup failed: %d\n", iResult);
return 1; return 0xFFFFFFFF;
} }
#endif #endif
struct addrinfo hints = {0}; struct addrinfo hints = {0};

View File

@ -91,6 +91,10 @@ print ============== TD-5998
sql_error select _block_dist() from (select * from $nt) sql_error select _block_dist() from (select * from $nt)
sql_error select _block_dist() from (select * from $mt) sql_error select _block_dist() from (select * from $mt)
print ============== TD-22140 & TD-22165
sql_error show table distributed information_schema.ins_databases
sql_error show table distributed performance_schema.perf_apps
print =============== clear print =============== clear
sql drop database $db sql drop database $db
sql select * from information_schema.ins_databases sql select * from information_schema.ins_databases