Merge remote-tracking branch 'origin' into feat/TS-2502

This commit is contained in:
dapan1121 2023-02-06 17:22:18 +08:00
commit ded47e1312
56 changed files with 1408 additions and 439 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG a0234fe
GIT_TAG 6a2d9fc
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

6
docs/examples/go/go.mod Normal file
View File

@ -0,0 +1,6 @@
module goexample
go 1.17
require github.com/taosdata/driver-go/v3 v3.1.0

View File

@ -218,6 +218,7 @@ bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId);
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
bool fmIsGroupKeyFunc(int32_t funcId);
bool fmIsBlockDistFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType);

View File

@ -108,7 +108,8 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf,
int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);

View File

@ -152,9 +152,10 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName, bool autoCreateTbl) {
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName,
bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt;
char tbFName[TSDB_TABLE_FNAME_LEN];
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(tbName, tbFName);
memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
@ -772,9 +773,9 @@ int stmtAddBatch(TAOS_STMT* stmt) {
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
int32_t code = 0;
int32_t finalCode = 0;
size_t keyLen = 0;
int32_t code = 0;
int32_t finalCode = 0;
size_t keyLen = 0;
STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlock = *pIter;
@ -844,7 +845,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
pMeta->uid = pTableMeta->uid;
pStmt->bInfo.tbUid = pTableMeta->uid;
taosMemoryFree(pTableMeta);
taosMemoryFree(pTableMeta);
}
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);

View File

@ -1891,9 +1891,6 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
}
tmq_list_destroy(lst);
/*return rsp;*/
return 0;
}
taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
return 0;

View File

@ -1431,6 +1431,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock->info.rows = 0;
pBlock->info.capacity = 0;
pBlock->info.rowSize = 0;
pBlock->info.id = pDataBlock->info.id;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {

View File

@ -167,6 +167,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -836,10 +836,13 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic
bool existing = false;
#if 1
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
ASSERT(strcmp(topic, addedTopic) != 0);
if (strcmp(topic, addedTopic) == 0) {
existing = true;
}
}
#endif
@ -854,8 +857,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
}
// add to current topic
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
if (!existing) {
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
}
// set status
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {

View File

@ -41,6 +41,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq);
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
@ -64,6 +65,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
@ -2176,6 +2178,10 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
return 0;
}
static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) {
return 0;
}
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;

View File

@ -85,7 +85,11 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMeta->code == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
return 0;
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
}

View File

@ -572,8 +572,20 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
mTrace("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
if (pOld->createdTime != pNew->createdTime) {
mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64
", new row:%p stage:%s create:%" PRId64,
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
// only occured while sync timeout
terrno = TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT;
return -1;
}
mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);

View File

@ -93,6 +93,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask);
return 0;
}

View File

@ -321,6 +321,8 @@ struct STsdb {
STsdbFS fs;
SLRUCache *lruCache;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
};
struct TSDBKEY {
@ -746,6 +748,9 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);

View File

@ -56,4 +56,7 @@ int metaPrepareAsyncCommit(SMeta *pMeta) {
}
// abort the meta txn
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); }
int metaAbort(SMeta *pMeta) {
if (!pMeta->txn) return 0;
return tdbAbort(pMeta->pEnv, pMeta->txn);
}

View File

@ -203,7 +203,7 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pEnv) tdbAbort(pMeta->pEnv, pMeta->txn);
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);

View File

@ -93,21 +93,21 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
if (tqMetaOpen(pTq) < 0) {
ASSERT(0);
return NULL;
}
pTq->pOffsetStore = tqOffsetOpen(pTq);
if (pTq->pOffsetStore == NULL) {
ASSERT(0);
return NULL;
}
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
if (pTq->pStreamMeta == NULL) {
ASSERT(0);
return NULL;
}
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
ASSERT(0);
return NULL;
}
return pTq;

View File

@ -15,6 +15,34 @@
#include "tsdb.h"
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = taosLRUCacheInit(5 * 1024 * 1024, -1, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->biMutex, NULL);
_err:
pTsdb->biCache = pCache;
return code;
}
static void tsdbCloseBICache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->biCache;
if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->biMutex);
}
}
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
@ -26,6 +54,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err;
}
code = tsdbOpenBICache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->lruMutex, NULL);
@ -44,6 +78,8 @@ void tsdbCloseCache(STsdb *pTsdb) {
taosThreadMutexDestroy(&pTsdb->lruMutex);
}
tsdbCloseBICache(pTsdb);
}
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
@ -1475,3 +1511,84 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) {
return usage;
}
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
struct {
int32_t fid;
int64_t commitID;
} biKey = {0};
biKey.fid = fid;
biKey.commitID = commitID;
*len = sizeof(biKey);
memcpy(key, &biKey, *len);
}
static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockIdx) {
SArray *pArray = taosArrayInit(8, sizeof(SBlockIdx));
int32_t code = tsdbReadBlockIdx(pFileReader, pArray);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pArray);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
*aBlockIdx = pArray;
return code;
}
static void deleteBICache(const void *key, size_t keyLen, void *value) {
SArray *pArray = (SArray *)value;
taosArrayDestroy(pArray);
}
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
getBICacheKey(pFileReader->pSet->fid, pFileReader->pSet->pHeadF->commitID, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pFileReader->pTsdb;
taosThreadMutexLock(&pTsdb->biMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
SArray *pArray = NULL;
code = tsdbCacheLoadBlockIdx(pFileReader, &pArray);
// if table's empty or error, return code of -1
if (code != TSDB_CODE_SUCCESS || pArray == NULL) {
taosThreadMutexUnlock(&pTsdb->biMutex);
*handle = NULL;
return 0;
}
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteBICache;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->biMutex);
}
*handle = h;
return code;
}
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}

View File

@ -79,16 +79,19 @@ typedef struct SIOCostSummary {
int64_t composedBlocks;
double buildComposedBlockTime;
double createScanInfoList;
double getTbFromMemTime;
double getTbFromIMemTime;
double initDelSkylineIterTime;
} SIOCostSummary;
typedef struct SBlockLoadSuppInfo {
SArray* pColAgg;
SColumnDataAgg tsColAgg;
int16_t* colId;
int16_t* slotId;
int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
bool smaValid; // the sma on all queried columns are activated
SArray* pColAgg;
SColumnDataAgg tsColAgg;
int16_t* colId;
int16_t* slotId;
int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo;
typedef struct SLastBlockReader {
@ -168,11 +171,11 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema; // the newest version schema
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
STSchema* pSchema; // the newest version schema
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SVersionRange verRange;
SBlockInfoBuf blockInfoBuf;
int32_t step;
@ -219,17 +222,18 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols;
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES));
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
if (pSupInfo->colId == NULL) {
taosMemoryFree(pSupInfo->colId);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
for (int32_t i = 0; i < numOfCols; ++i) {
pSupInfo->colId[i] = pCols[i].colId;
pSupInfo->slotId[i] = pSlotIdList[i];
@ -247,7 +251,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
int32_t i = 0, j = 0;
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
STColumn* pTCol = &pSchema->columns[i];
if (pTCol->colId == pSupInfo->colId[j]) {
if (!IS_BSMA_ON(pTCol)) {
@ -312,7 +316,8 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap =
@ -398,9 +403,7 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
taosHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
return pWindow->skey > pWindow->ekey;
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
@ -644,17 +647,21 @@ _end:
}
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
// SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
int64_t st = taosGetTimestampUs();
int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
if (code != TSDB_CODE_SUCCESS) {
// int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
size_t num = taosArrayGetSize(aBlockIdx);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
taosArrayDestroy(aBlockIdx);
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
// taosArrayDestroy(aBlockIdx);
return TSDB_CODE_SUCCESS;
}
@ -690,7 +697,8 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
_end:
taosArrayDestroy(aBlockIdx);
// taosArrayDestroy(aBlockIdx);
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
@ -769,7 +777,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el;
@ -903,7 +910,7 @@ static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* p
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
int32_t dumpedRows, bool asc) {
int32_t dumpedRows, bool asc) {
uint8_t* p = NULL;
if (asc) {
p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
@ -912,22 +919,21 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
}
int32_t step = asc? 1:-1;
int32_t step = asc ? 1 : -1;
// make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
// 2. reverse the array list in case of descending order scan data block
if (!asc) {
switch(pColData->info.type) {
switch (pColData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
{
case TSDB_DATA_TYPE_UBIGINT: {
int32_t mid = dumpedRows >> 1u;
int64_t* pts = (int64_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) {
@ -941,7 +947,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: {
int32_t mid = dumpedRows >> 1u;
int32_t mid = dumpedRows >> 1u;
int8_t* pts = (int8_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) {
int8_t t = pts[j];
@ -1113,7 +1119,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s",
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
@ -1767,7 +1773,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMergerAdd(&merge, pRow, pSchema);
} else {
init = true;
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -2178,10 +2184,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
}
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
int64_t st = 0;
STbData* d = NULL;
if (pReader->pReadSnap->pMem != NULL) {
st = taosGetTimestampUs();
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
pReader->cost.getTbFromMemTime += (taosGetTimestampUs() - st) / 1000.0;
if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
if (code == TSDB_CODE_SUCCESS) {
@ -2202,7 +2211,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* di = NULL;
if (pReader->pReadSnap->pIMem != NULL) {
st = taosGetTimestampUs();
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
pReader->cost.getTbFromIMemTime += (taosGetTimestampUs() - st) / 1000.0;
if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
if (code == TSDB_CODE_SUCCESS) {
@ -2221,7 +2232,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
}
st = taosGetTimestampUs();
initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS;
@ -2297,7 +2310,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
return false; // this is an invalid result.
return false; // this is an invalid result.
}
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
}
@ -2448,7 +2461,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
@ -2651,7 +2664,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
taosArrayDestroy(pIndexList);
if (pReader->pReadSnap != NULL) {
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
if (pReader->pDelFReader == NULL && pDelFile != NULL) {
int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
@ -2839,12 +2851,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
ASSERT(pBlockInfo != NULL);
if (pBlockInfo != NULL) {
pScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
} else {
pScanInfo = *pReader->status.pTableIter;
}
// if (pBlockInfo != NULL) {
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
// } else {
// pScanInfo = *pReader->status.pTableIter;
// }
if (pScanInfo == NULL) {
tsdbError("failed to get table scan-info, %s", pReader->idStr);
@ -2852,46 +2863,14 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
if (pBlockInfo != NULL) {
pBlock = getCurrentBlock(pBlockIter);
}
// if (pBlockInfo != NULL) {
pBlock = getCurrentBlock(pBlockIter);
// }
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
/*if (pBlockInfo == NULL) { // build data block from last data file
SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData);
SSDataBlock* pResBlock = pReader->pResBlock;
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
int64_t st = taosGetTimestampUs();
while (1) {
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed.
if (hasBlockLData == false) {
break;
}
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
}
double el = (taosGetTimestampUs() - st) / 1000.0;
updateComposedBlockInfo(pReader, el, pScanInfo);
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
} else*/ if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2937,7 +2916,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
@ -2951,6 +2930,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
pBlock->maxKey.ts, pReader->idStr);
}
}
@ -3202,7 +3186,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false;
} else if (pKey->ts == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
@ -3386,7 +3371,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
bool loadNeighbor = true;
bool loadNeighbor = true;
int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
@ -3638,7 +3623,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
SColVal colVal = {0};
int32_t i = 0, j = 0;
if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
i += 1;
@ -3683,7 +3668,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t outputRowIndex = pResBlock->info.rows;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
@ -4025,16 +4010,17 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pLReader);
}
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms,"
", getTbFromMem-time:%.2f ms, getTbFromIMem-time:%.2f ms, initDelSkylineIterTime:%.2f ms, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pCost->getTbFromMemTime, pCost->getTbFromIMemTime, pCost->initDelSkylineIterTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
@ -4050,7 +4036,7 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
blockDataCleanup(pBlock);
SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0){
if (taosHashGetSize(pStatus->pTableMap) == 0) {
return false;
}
@ -4140,12 +4126,10 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
}
}
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols,
SColumnDataAgg* pTsAgg) {
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
// do fill all null column value SMA info
int32_t i = 0, j = 0;
int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg);
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
while (j < numOfCols && i < size) {
@ -4158,7 +4142,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} else if (pSup->colId[j] < pAgg->colId) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i ,&nullColAgg);
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
}
j += 1;
}
@ -4427,7 +4411,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
if (pReader->pReadSnap->pMem != NULL) {
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
if (d != NULL) {
rows += tsdbGetNRowsInTbData(d);
@ -4435,7 +4419,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
}
STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) {
if (pReader->pReadSnap->pIMem != NULL) {
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
if (di != NULL) {
rows += tsdbGetNRowsInTbData(di);

View File

@ -1205,7 +1205,7 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
TSDB_CHECK_CODE(code, lino, _exit);
}
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
tMapDataPutItem(&pWriter->mDataBlk, &dataBlk, tPutDataBlk);
pWriter->pDIter->dIter.iDataBlk++;
} else {
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);

View File

@ -929,8 +929,9 @@ int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) {
return code;
}
/*
// delete skyline ======================================================
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
static int32_t tsdbMergeSkyline2(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
int32_t code = 0;
int32_t i1 = 0;
int32_t n1 = taosArrayGetSize(aSkyline1);
@ -996,7 +997,141 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS
_exit:
return code;
}
*/
// delete skyline ======================================================
static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
int32_t code = 0;
int32_t i1 = 0;
int32_t n1 = taosArrayGetSize(pSkyline1);
int32_t i2 = 0;
int32_t n2 = taosArrayGetSize(pSkyline2);
TSDBKEY *pKey1;
TSDBKEY *pKey2;
int64_t version1 = 0;
int64_t version2 = 0;
ASSERT(n1 > 0 && n2 > 0);
taosArrayClear(pSkyline);
TSDBKEY **pItem = TARRAY_GET_ELEM(pSkyline, 0);
while (i1 < n1 && i2 < n2) {
pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
if (pKey1->ts < pKey2->ts) {
version1 = pKey1->version;
*pItem = pKey1;
i1++;
} else if (pKey1->ts > pKey2->ts) {
version2 = pKey2->version;
*pItem = pKey2;
i2++;
} else {
version1 = pKey1->version;
version2 = pKey2->version;
*pItem = pKey1;
i1++;
i2++;
}
(*pItem)->version = TMAX(version1, version2);
pItem++;
}
while (i1 < n1) {
pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
*pItem = pKey1;
pItem++;
i1++;
}
while (i2 < n2) {
pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
*pItem = pKey2;
pItem++;
i2++;
}
taosArraySetSize(pSkyline, TARRAY_ELEM_IDX(pSkyline, pItem));
_exit:
return code;
}
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
int32_t code = 0;
SDelData *pDelData;
int32_t midx;
taosArrayClear(pSkyline);
if (sidx == eidx) {
TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
taosArrayPush(pSkyline, &pItem1);
taosArrayPush(pSkyline, &pItem2);
} else {
SArray *pSkyline1 = NULL;
SArray *pSkyline2 = NULL;
midx = (sidx + eidx) / 2;
pSkyline1 = taosArrayInit((midx - sidx + 1) * 2, POINTER_BYTES);
pSkyline2 = taosArrayInit((eidx - midx) * 2, POINTER_BYTES);
if (pSkyline1 == NULL || pSkyline1 == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _clear;
}
code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1);
if (code) goto _clear;
code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
if (code) goto _clear;
code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
_clear:
taosArrayDestroy(pSkyline1);
taosArrayDestroy(pSkyline2);
}
return code;
}
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
SDelData *pDelData;
int32_t code = 0;
int32_t dataNum = eidx - sidx + 1;
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
for (int32_t i = sidx; i <= eidx; ++i) {
pDelData = (SDelData *)taosArrayGet(aDelData, i);
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
}
code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
if (code) goto _clear;
int32_t skylineNum = taosArrayGetSize(pSkyline);
for (int32_t i = 0; i < skylineNum; ++i) {
TSDBKEY *p = taosArrayGetP(pSkyline, i);
taosArrayPush(aSkyline, p);
}
_clear:
taosArrayDestroy(aTmpSkyline);
taosArrayDestroy(pSkyline);
return code;
}
/*
int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
int32_t code = 0;
SDelData *pDelData;
int32_t midx;
@ -1033,6 +1168,7 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
return code;
}
*/
// SBlockData ======================================================
int32_t tBlockDataCreate(SBlockData *pBlockData) {

View File

@ -704,9 +704,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
SDiskbasedBuf* pBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);

View File

@ -218,10 +218,7 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
} else if (status == PROJECT_RETRIEVE_DONE) {
size_t rows = pBlock->info.rows;
pExchangeInfo->limitInfo.numOfOutputRows += rows;
if (rows == 0) {
if (pBlock->info.rows == 0) {
setOperatorCompleted(pOperator);
return NULL;
} else {
@ -707,6 +704,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
}
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pLimitInfo->remainGroupOffset > 0) {
if (pLimitInfo->currentGroupId == 0) { // it is the first group
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
@ -750,36 +749,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
// set current group id
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pBlock);
bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
} else {
if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator);
} else {
// current group limitation is reached, and future blocks of this group need to be discarded.
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
return PROJECT_RETRIEVE_DONE;
}
return PROJECT_RETRIEVE_DONE;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
pLimitInfo->slimit.limit != -1) {
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE;

View File

@ -1755,6 +1755,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
pLimitInfo->slimit.offset != -1);
}
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
}
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};

View File

@ -90,7 +90,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock;
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pInfo->mergeDataBlocks = false;
} else {
if (!pProjPhyNode->ignoreGroupId) {
pInfo->mergeDataBlocks = false;
} else {
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
}
}
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
@ -181,40 +190,20 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
return PROJECT_RETRIEVE_DONE;
}
// todo refactor
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
SOperatorInfo* pOperator) {
// set current group id
pLimitInfo->currentGroupId = groupId;
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pBlock);
bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
// TODO: optimize it later when partition by + limit
// all retrieved requirement has been fulfilled, let's finish this
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
} else {
if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator);
} else {
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
// from next group. So let's continue this retrieve process
if (keepRows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
}
}
pLimitInfo->numOfOutputRows += pBlock->info.rows;
return PROJECT_RETRIEVE_DONE;
}

View File

@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
}
}
// todo handle the slimit info
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SLimit* pLimit = &pLimitInfo->limit;
const char* id = GET_TASKID(pTaskInfo);
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataEmpty(pBlock);
@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
// limit the output rows
int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keep);
pLimitInfo->numOfOutputRows += pBlock->info.rows;
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
return true;
}
pLimitInfo->numOfOutputRows += pBlock->info.rows;
return false;
}
@ -310,8 +311,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalRows += pBlock->info.rows;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d, uid:%"PRIu64, GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1;
return TSDB_CODE_SUCCESS;
@ -393,13 +394,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
}
}
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
if (limitReached) { // set operator flag is done
setOperatorCompleted(pOperator);
}
pCost->totalRows += pBlock->info.rows;
pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
return TSDB_CODE_SUCCESS;
}
@ -617,6 +617,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pOperator->status == OP_EXEC_DONE) {
break;
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
@ -628,9 +632,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
@ -2540,7 +2543,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
uint32_t status = 0;
loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
@ -2714,12 +2717,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
}
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
pInfo->limitInfo.numOfOutputRows);
if (limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}

View File

@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
// multi-group case not handle here
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
@ -236,28 +237,14 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
continue;
}
// todo add the limit/offset info
if (pInfo->limitInfo.remainOffset > 0) {
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) {
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
continue;
}
blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset);
pInfo->limitInfo.remainOffset = 0;
// there are bugs?
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
if (limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
if (pInfo->limitInfo.limit.limit > 0 &&
pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) {
int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows;
blockDataKeepFirstNRows(pBlock, remain);
}
size_t numOfRows = blockDataGetNumOfRows(pBlock);
pInfo->limitInfo.numOfOutputRows += numOfRows;
pOperator->resultInfo.totalRows += numOfRows;
if (numOfRows > 0) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
if (pBlock->info.rows > 0) {
break;
}
}
@ -557,7 +544,6 @@ typedef struct SMultiwayMergeOperatorInfo {
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
int64_t startTs; // sort start time
bool groupSort;
bool hasGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
} SMultiwayMergeOperatorInfo;
@ -604,7 +590,9 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) {
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
SSDataBlock* p, bool* newgroup) {
*newgroup = false;
while (1) {
STupleHandle* pTupleHandle = NULL;
@ -613,8 +601,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
uint64_t gid = tsortGetGroupId(pTupleHandle);
if (gid != pInfo->groupId) {
*newgroup = true;
pInfo->groupId = gid;
}
}
} else {
pTupleHandle = tsortNextTuple(pHandle);
@ -627,12 +619,10 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
if (pInfo->groupSort) {
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (!pInfo->hasGroupId) {
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = tupleGroupId;
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
@ -645,11 +635,6 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
break;
}
}
if (pInfo->groupSort) {
pInfo->hasGroupId = false;
}
}
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
@ -673,14 +658,19 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
}
SSDataBlock* p = pInfo->pIntermediateBlock;
bool newgroup = false;
while (1) {
doGetSortedBlockData(pInfo, pHandle, capacity, p);
doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
if (p->info.rows == 0) {
break;
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
if (newgroup) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}

View File

@ -447,38 +447,6 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t
taosResetFillInfo(pFillInfo, skey);
switch (fillType) {
case FILL_MODE_NONE:
pFillInfo->type = TSDB_FILL_NONE;
break;
case FILL_MODE_PREV:
pFillInfo->type = TSDB_FILL_PREV;
break;
case FILL_MODE_NULL:
pFillInfo->type = TSDB_FILL_NULL;
break;
case FILL_MODE_NULL_F:
pFillInfo->type = TSDB_FILL_NULL_F;
break;
case FILL_MODE_LINEAR:
pFillInfo->type = TSDB_FILL_LINEAR;
break;
case FILL_MODE_NEXT:
pFillInfo->type = TSDB_FILL_NEXT;
break;
case FILL_MODE_VALUE:
pFillInfo->type = TSDB_FILL_SET_VALUE;
break;
case FILL_MODE_VALUE_F:
pFillInfo->type = TSDB_FILL_SET_VALUE_F;
break;
default: {
taosMemoryFree(pFillInfo);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
}
pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol;
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols;

View File

@ -435,6 +435,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
@ -533,6 +538,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
// restore the value
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
if (pResBlock->info.rows == 0) {
@ -568,6 +575,11 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
}
}
code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(&pOperator->resultInfo, 4096);
@ -624,6 +636,7 @@ void destroyTimeSliceOperatorInfo(void* param) {
taosMemoryFree(pKey->end.val);
}
taosArrayDestroy(pInfo->pLinearInfo);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFree(pInfo->pFillColInfo);
taosMemoryFreeClear(param);

View File

@ -714,26 +714,18 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
pBuf->type = type;
if (IS_NULL_TYPE(type)) {
numOfElems = 0;
goto _over;
}
// data in current data block are qualified to the query
if (pInput->colDataSMAIsSet) {
numOfElems = pInput->numOfRows - pAgg->numOfNull;
if (numOfElems == 0) {
goto _over;
}
void* tval = NULL;
int16_t index = 0;
if (isMinFunc) {
tval = &pInput->pColumnDataAgg[0]->min;
} else {
tval = &pInput->pColumnDataAgg[0]->max;
}
void* tval = (isMinFunc) ? &pInput->pColumnDataAgg[0]->min : &pInput->pColumnDataAgg[0]->max;
if (!pBuf->assign) {
if (type == TSDB_DATA_TYPE_FLOAT) {
@ -824,8 +816,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
}
}
numOfElems = 1;
pBuf->assign = true;
return TSDB_CODE_SUCCESS;
goto _over;
}
int32_t start = pInput->startRowIndex;

View File

@ -262,6 +262,13 @@ bool fmIsGroupKeyFunc(int32_t funcId) {
return FUNCTION_TYPE_GROUP_KEY == funcMgtBuiltins[funcId].type;
}
bool fmIsBlockDistFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
}
void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {

View File

@ -22,11 +22,11 @@ struct SToken;
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
sToken = tStrGetToken(pSql, &index, false); \
pSql += index; \
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
sToken = tStrGetToken(pSql, &index, false, NULL); \
pSql += index; \
} while (0)
#define CHECK_CODE(expr) \

View File

@ -55,7 +55,7 @@ uint32_t tGetToken(const char *z, uint32_t *tokenType);
* @param isPrevOptr
* @return
*/
SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr);
SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr, bool *pIgnoreComma);
/**
* check if it is a keyword or not

View File

@ -18,16 +18,23 @@
#include "tglobal.h"
#include "ttime.h"
#define NEXT_TOKEN_WITH_PREV(pSql, token) \
do { \
int32_t index = 0; \
token = tStrGetToken(pSql, &index, true); \
pSql += index; \
#define NEXT_TOKEN_WITH_PREV(pSql, token) \
do { \
int32_t index = 0; \
token = tStrGetToken(pSql, &index, true, NULL); \
pSql += index; \
} while (0)
#define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \
do { \
token = tStrGetToken(pSql, &index, false); \
#define NEXT_TOKEN_WITH_PREV_EXT(pSql, token, pIgnoreComma) \
do { \
int32_t index = 0; \
token = tStrGetToken(pSql, &index, true, pIgnoreComma); \
pSql += index; \
} while (0)
#define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \
do { \
token = tStrGetToken(pSql, &index, false, NULL); \
} while (0)
#define NEXT_VALID_TOKEN(pSql, token) \
@ -302,12 +309,12 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
* e.g., now+12a, now-5h
*/
index = 0;
SToken token = tStrGetToken(pTokenEnd, &index, false);
SToken token = tStrGetToken(pTokenEnd, &index, false, NULL);
pTokenEnd += index;
if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) {
index = 0;
SToken valueToken = tStrGetToken(pTokenEnd, &index, false);
SToken valueToken = tStrGetToken(pTokenEnd, &index, false, NULL);
pTokenEnd += index;
if (valueToken.n < 2) {
@ -1240,30 +1247,35 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB
int32_t code = tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string
for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
NEXT_TOKEN_WITH_PREV(*pSql, *pToken);
SSchema* pSchema = &pSchemas[pCols->boundColumns[i]];
const char* pOrigSql = *pSql;
bool ignoreComma = false;
NEXT_TOKEN_WITH_PREV_EXT(*pSql, *pToken, &ignoreComma);
if (ignoreComma) {
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
break;
}
if (pToken->type == TK_NK_QUESTION) {
isParseBindParam = true;
if (NULL == pCxt->pComCxt->pStmtCb) {
code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
break;
}
} else {
if (TK_NK_RP == pToken->type) {
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
break;
}
continue;
}
if (TSDB_CODE_SUCCESS == code && TK_NK_RP == pToken->type) {
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
}
if (isParseBindParam) {
code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
break;
}
if (TSDB_CODE_SUCCESS == code && isParseBindParam) {
code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
}
if (TSDB_CODE_SUCCESS == code) {
param.schema = pSchema;
param.schema = &pSchemas[pCols->boundColumns[i]];
insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, &param.toffset, &param.colIdx);
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pDataBuf->pTableMeta).precision, insMemRowAppend,
&param);
code = parseValueToken(pCxt, pSql, pToken, param.schema, getTableInfo(pDataBuf->pTableMeta).precision,
insMemRowAppend, &param);
}
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {

View File

@ -47,7 +47,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_APP_ERROR;
@ -137,7 +137,8 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
}
SVCreateTbReq tbReq = {0};
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags,
TSDB_DEFAULT_TABLE_TTL);
code = insBuildCreateTbMsg(pDataBlock, &tbReq);
tdDestroySVCreateTbReq(&tbReq);

View File

@ -622,7 +622,7 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
return 0;
}
SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreComma) {
SToken t0 = {0};
// here we reach the end of sql string, null-terminated string
@ -643,6 +643,10 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
return t0;
}
if (NULL != pIgnoreComma && t == ',') {
*pIgnoreComma = true;
}
t = str[++(*i)];
}

View File

@ -1550,11 +1550,34 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
// select percentile() without from clause is also valid
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) ||
NULL != pSelect->pPartitionByList) {
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
if (NULL != pSelect->pPartitionByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function is not supported in partition query", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pFunc) {
if (!fmIsBlockDistFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCtx->pCurrStmt)) {
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
SSelectStmt* pSelect = (SSelectStmt*)pCtx->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
if (NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_SUPER_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) {
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s is only supported on super table, child table or normal table", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
@ -1717,7 +1740,7 @@ static int32_t rewriteSystemInfoFunc(STranslateContext* pCxt, SNode** pNode) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
static int32_t translateNormalFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
int32_t code = translateAggFunc(pCxt, pFunc);
if (TSDB_CODE_SUCCESS == code) {
code = translateScanPseudoColumnFunc(pCxt, pFunc);
@ -1749,6 +1772,9 @@ static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* p
if (TSDB_CODE_SUCCESS == code) {
code = translateTimelineFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateBlockDistFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
setFuncClassification(pCxt->pCurrStmt, pFunc);
}
@ -1809,7 +1835,7 @@ static int32_t translateFunctionImpl(STranslateContext* pCxt, SFunctionNode** pF
if (fmIsClientPseudoColumnFunc((*pFunc)->funcId)) {
return rewriteClientPseudoColumnFunc(pCxt, (SNode**)pFunc);
}
return translateNoramlFunction(pCxt, *pFunc);
return translateNormalFunction(pCxt, *pFunc);
}
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc) {
@ -2975,7 +3001,7 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode*
intervalRange = pInterval->datum.i;
}
if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
if ((timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
}

View File

@ -27,7 +27,7 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
const char* pSql = pStr;
int32_t index = 0;
SToken t = tStrGetToken((char*)pStr, &index, false);
SToken t = tStrGetToken((char*)pStr, &index, false, NULL);
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
return false;
}
@ -35,7 +35,7 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
do {
pStr += index;
index = 0;
t = tStrGetToken((char*)pStr, &index, false);
t = tStrGetToken((char*)pStr, &index, false, NULL);
if (TK_USING == t.type || TK_VALUES == t.type || TK_FILE == t.type) {
return true;
} else if (TK_SELECT == t.type) {

View File

@ -1016,7 +1016,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP(pProject->node.pLimit, pSelect->pLimit);
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
pProject->ignoreGroupId = (NULL == pSelect->pPartitionByList);
pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList);
pProject->node.groupAction =
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;

View File

@ -348,7 +348,8 @@ static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
return false;
}
if (NULL != pAgg->pGroupKeys) {
return stbSplHasPartTbname(pAgg->pGroupKeys) && stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
return stbSplHasPartTbname(pAgg->pGroupKeys) &&
stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
@ -559,6 +560,8 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
if (NULL == pMerge->node.pLimit) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
((SLimitNode*)pSplitNode->pLimit)->limit += ((SLimitNode*)pSplitNode->pLimit)->offset;
((SLimitNode*)pSplitNode->pLimit)->offset = 0;
}
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pSubplan) {
@ -1025,21 +1028,29 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code;
}
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = pInfo->pSplitNode;
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
*pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
pSplitNode = pInfo->pSplitNode->pParent;
*pSplitNode = pInfo->pSplitNode->pParent;
if (NULL != pInfo->pSplitNode->pLimit) {
pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
if (NULL == pSplitNode->pLimit) {
(*pSplitNode)->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
if (NULL == (*pSplitNode)->pLimit) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset;
((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0;
}
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
return TSDB_CODE_SUCCESS;
}
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = NULL;
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
@ -1049,12 +1060,11 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp
}
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
pSplitNode = pInfo->pSplitNode->pParent;
SLogicNode* pSplitNode = NULL;
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
}
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));

View File

@ -24,7 +24,7 @@ extern "C" {
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF

View File

@ -830,7 +830,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
pMgr->endIndex = index + 1;
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64
sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64
" %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

View File

@ -112,7 +112,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
// event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
@ -379,7 +379,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
}
pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm;
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
@ -510,16 +510,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal;
bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
int64_t walCommitVer = walGetCommittedVer(pWal);
if (!isEmpty && ths->commitIndex != walCommitVer) {
sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer,
ths->commitIndex);
snapStart = walCommitVer + 1;
} else {
snapStart = ths->commitIndex + 1;
}
snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
}
@ -527,7 +519,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
return snapStart;
}
static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
@ -565,7 +557,7 @@ _START_RECEIVER:
} else {
// waiting for clock match
while (timeNow < pMsg->startTime) {
sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
@ -765,7 +757,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT
// if receiver already start
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
// if sender.start-time = receiver.start-time, maybe duplicate msg
@ -809,9 +801,9 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->raftStore.currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
@ -848,7 +840,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return code;
}
static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
@ -945,8 +937,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
pSender->startTime, pMsg->code);
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime,
pSender->startTime, tstrerror(pMsg->code), pMsg->code);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR;
}
@ -961,15 +953,15 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
terrno = pMsg->code;
goto _ERROR;
}
// prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
}
if (pSender->pReader == NULL || pSender->finish) {

View File

@ -141,20 +141,15 @@ static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bu
}
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int32_t len = 1;
int32_t len = 0;
len += snprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
if (pState == NULL) break;
if (i < pSyncNode->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex,
pState->lastSendTime);
} else {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex,
pState->lastSendTime);
}
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "%s", i, pState->lastSendIndex,
pState->lastSendTime, (i < pSyncNode->replicaNum - 1) ? ", " : "");
}
len += snprintf(buf + len, bufLen - len, "%s", "}");
}
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
@ -245,7 +240,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
char cfgStr[1024] = "";
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char peerStr[1024] = "{";
char peerStr[1024] = "";
syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
char eventLog[512]; // {0};
@ -255,20 +250,21 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
va_end(argpointer);
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64
" lcindex:%" PRId64
" seq:%d ack:%d finish:%d replica-index:%d dnode:%d}"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
"vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64
" last-term:%" PRIu64 " last-cfg:%" PRId64
", seq:%d ack:%d finish:%d, as:%d dnode:%d}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex,
pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
}
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
@ -291,7 +287,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
char cfgStr[1024] = "";
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char peerStr[1024] = "{";
char peerStr[1024] = "";
syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
char eventLog[512]; // {0};
@ -300,22 +296,22 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
va_end(argpointer);
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s,"
" {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from dnode:%d s-param:%" PRId64
" e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
"}"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack,
pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start,
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
pReceiver->snapshot.lastConfigIndex, pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
taosPrintLog(
flags, level, dflag,
"vgId:%d, %s, sync:%s,"
" snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
"}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term,
pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
}
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
@ -351,13 +347,13 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
int64_t execTime) {
if (printX) {
sNTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, x",
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, x",
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
} else {
sNTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
execTime);
}
@ -368,14 +364,14 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
pSyncNode->hbSlowNum++;
sNInfo(pSyncNode,
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
}
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, %s, net elapsed:%" PRId64,
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
}
@ -400,67 +396,64 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
sNDebug(pSyncNode,
"send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64,
"send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
DID(&pMsg->destId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
pMsg->startTime);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
sNDebug(pSyncNode,
"recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64 ", len:%u",
"recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64 ", data-len:%u",
DID(&pMsg->srcId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
pMsg->startTime, pMsg->dataLen);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
sNDebug(pSyncNode,
"send sync-snapshot-rsp to dnode:%d, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64,
"send sync-snapshot-rsp to dnode:%d, %s, acked:%d, term:%" PRId64 ", begin-index:%" PRId64
", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
DID(&pMsg->destId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
pMsg->startTime);
}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
sNDebug(pSyncNode,
"recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64,
"recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin-index:%" PRId64
", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
DID(&pMsg->srcId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
pMsg->startTime);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
sNTrace(pSyncNode,
"recv sync-append-entries from dnode:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
"recv sync-append-entries from dnode:%d {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
"}, commit-index:%" PRId64 ", datalen:%d}, %s",
DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->dataLen, s);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
sNTrace(pSyncNode,
"send sync-append-entries to dnode:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
"send sync-append-entries to dnode:%d, {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
"}, index:%" PRId64 ", commit-index:%" PRId64 ", datalen:%d}, %s",
DID(&pMsg->destId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1),
pMsg->commitIndex, pMsg->dataLen, s);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) {
if (voteGranted == -1) {
sNInfo(pSyncNode,
"recv sync-request-vote from dnode:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
} else {
sNInfo(pSyncNode,
"recv sync-request-vote from dnode:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64
"}, granted:%d",
DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted,
const char* errmsg) {
char statusMsg[64];
snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted);
sNInfo(pSyncNode,
"recv sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm,
(voteGranted != -1) ? statusMsg : errmsg);
}
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
sNInfo(pNode, "send sync-request-vote to dnode:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
sNInfo(pNode,
"send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}

View File

@ -457,6 +457,11 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
SPgno journalSize = 0;
int ret;
if (pTxn->jfd == 0) {
// txn is commited
return 0;
}
// sync the journal file
ret = tdbOsFSync(pTxn->jfd);
if (ret < 0) {

View File

@ -1045,6 +1045,12 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
return addr;
}
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
} else {
addr = *v;
@ -1116,9 +1122,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
if (ipaddr == 0xffffffff) {
uv_timer_stop(conn->timer);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExcept(conn);
return;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
STraceId* trace = &(pMsg->msg.info.traceId);

View File

@ -944,7 +944,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) {
// printf("WSAStartup failed: %d\n", iResult);
return 1;
return 0xFFFFFFFF;
}
#endif
struct addrinfo hints = {0};

48
tests/ci/Dockerfile Normal file
View File

@ -0,0 +1,48 @@
FROM python:3.8
RUN pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip3 install pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro
RUN apt-get update
RUN apt-get install -y psmisc sudo tree libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config build-essential valgrind \
vim libjemalloc-dev openssh-server screen sshpass net-tools dirmngr gnupg apt-transport-https ca-certificates software-properties-common r-base iputils-ping
RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/'
RUN apt install -y r-base
ADD go1.17.6.linux-amd64.tar.gz /usr/local/
ADD jdk-8u144-linux-x64.tar.gz /usr/local/
ADD apache-maven-3.8.4-bin.tar.gz /usr/local/
RUN apt-get install wget -y \
&& wget https://packages.microsoft.com/config/ubuntu/18.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb \
&& dpkg -i packages-microsoft-prod.deb \
&& rm packages-microsoft-prod.deb \
&& apt-get update && apt-get install -y dotnet-sdk-5.0 && apt-get install -y dotnet-sdk-6.0
ADD node-v12.20.0-linux-x64.tar.gz /usr/local/
RUN sh -c "rm -f /etc/localtime;ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime;echo \"Asia/Shanghai\" >/etc/timezone"
COPY id_rsa /root/.ssh/id_rsa
COPY .m2 /root/.m2
COPY .nuget /root/.nuget
COPY .dotnet /root/.dotnet
COPY .cargo /root/.cargo
COPY go /root/go
ADD cmake-3.21.5-linux-x86_64.tar.gz /usr/local/
RUN echo " export RUSTUP_DIST_SERVER=\"https://rsproxy.cn\" " >> /root/.bashrc
RUN echo " export RUSTUP_UPDATE_ROOT=\"https://rsproxy.cn/rustup\" " >> /root/.bashrc
RUN curl https://sh.rustup.rs -o /tmp/rustup-init.sh
RUN sh /tmp/rustup-init.sh -y
ENV PATH /usr/local/go/bin:/usr/local/node-v12.20.0-linux-x64/bin:/usr/local/apache-maven-3.8.4/bin:/usr/local/jdk1.8.0_144/bin:/usr/local/cmake-3.21.5-linux-x86_64/bin:/root/.cargo/bin:$PATH
ENV JAVA_HOME /usr/local/jdk1.8.0_144
RUN go env -w GOPROXY=https://goproxy.cn
RUN echo "StrictHostKeyChecking no" >>/etc/ssh/ssh_config
RUN npm config -g set unsafe-perm
RUN npm config -g set registry https://registry.npm.taobao.org
COPY .npm /root/.npm
RUN R CMD javareconf JAVA_HOME=${JAVA_HOME} JAVA=${JAVA_HOME}/bin/java JAVAC=${JAVA_HOME}/bin/javac JAVAH=${JAVA_HOME}/bin/javah JAR=${JAVA_HOME}/bin/jar
RUN echo "install.packages(\"RJDBC\", repos=\"http://cran.us.r-project.org\")"|R --no-save
COPY .gitconfig /root/.gitconfig
RUN mkdir -p /run/sshd
COPY id_rsa.pub /root/.ssh/id_rsa.pub
COPY id_rsa.pub /root/.ssh/authorized_keys
RUN pip3 uninstall -y taostest
COPY repository/TDinternal /home/TDinternal
COPY repository/taos-connector-python /home/taos-connector-python
RUN sh -c "cd /home/taos-connector-python; pip3 install ."
COPY setup.sh /home/setup.sh

4
tests/ci/build_image.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
docker build --no-cache -t taos_test:v1.0 .

60
tests/ci/daily_build_image.sh Executable file
View File

@ -0,0 +1,60 @@
#!/bin/bash
set -x
script_dir=`dirname $0`
cd $script_dir
script_dir=`pwd`
cd $script_dir/repository/taos-connector-python
git pull
cd $script_dir/repository/TDinternal
git clean -fxd
git pull
cd $script_dir/repository/TDinternal/community
git clean -fxd
git checkout main
git pull origin main
git submodule update --init --recursive
cd $script_dir
cp $script_dir/repository/TDinternal/community/tests/ci/build_image.sh .
cp $script_dir/repository/TDinternal/community/tests/ci/daily_build_image.sh .
./build_image.sh || exit 1
docker image prune -f
ips="\
192.168.1.47 \
192.168.1.48 \
192.168.1.49 \
192.168.1.52 \
192.168.0.215 \
192.168.0.217 \
192.168.0.219 \
"
image=taos_image.tar
docker save taos_test:v1.0 -o $image
for ip in $ips; do
echo "scp $image root@$ip:/home/ &"
scp $image root@$ip:/home/ &
done
wait
for ip in $ips; do
echo "ssh root@$ip docker load -i /home/$image &"
ssh root@$ip docker load -i /home/$image &
done
wait
for ip in $ips; do
echo "ssh root@$ip rm -f /home/$image &"
ssh root@$ip rm -f /home/$image &
done
wait
rm -rf taos_image.tar

View File

@ -178,6 +178,7 @@
,,y,script,./test.sh -f tsim/query/udf_with_const.sim
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/forceFill.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim

View File

@ -91,6 +91,10 @@ print ============== TD-5998
sql_error select _block_dist() from (select * from $nt)
sql_error select _block_dist() from (select * from $mt)
print ============== TD-22140 & TD-22165
sql_error show table distributed information_schema.ins_databases
sql_error show table distributed performance_schema.perf_apps
print =============== clear
sql drop database $db
sql select * from information_schema.ins_databases

View File

@ -72,4 +72,33 @@ sql_error select interp(*) from nt5931 where ts=now
sql_error select interp(*) from st5931 where ts=now
sql_error select interp(*) from ct5931 where ts=now
sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int);
sql create table tba1 using sta tags(1);
sql insert into tba1 values ('2022-04-26 15:15:01', -3.0, "a");
sql insert into tba1 values ('2022-04-26 15:15:05', 3.0, "b");
sql select a from (select interp(f1) as a from tba1 where ts >= '2022-04-26 15:15:01' and ts <= '2022-04-26 15:15:05' range('2022-04-26 15:15:01','2022-04-26 15:15:05') every(1s) fill(linear)) where a > 0;
if $rows != 2 then
return -1
endi
if $data00 != 1.500000000 then
return -1
endi
if $data10 != 3.000000000 then
return -1
endi
sql select a from (select interp(f1+1) as a from tba1 where ts >= '2022-04-26 15:15:01' and ts <= '2022-04-26 15:15:05' range('2022-04-26 15:15:01','2022-04-26 15:15:05') every(1s) fill(linear)) where a > 0;
if $rows != 3 then
return -1
endi
if $data00 != 1.000000000 then
return -1
endi
if $data10 != 2.500000000 then
return -1
endi
if $data20 != 4.000000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -31,13 +31,8 @@ sql insert into $tb values ( $ts , $x )
$x = $x + 1
endw
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sql flush database $db
sql use $db
sql delete from $tb where ts=1537146000000
sql delete from $tb where ts=1537146409500
@ -63,7 +58,6 @@ if $rows != 8198 then
return -1
endi
print ===========================> TD-22077 && TD-21877
sql drop database if exists $db -x step1
sql create database $db vgroups 1;
@ -88,6 +82,8 @@ endw
sql flush database $db
print ===========================> TD-22077 && TD-21877
sql insert into t1 values('2018-09-17 09:00:26', 26);
sql insert into t2 values('2018-09-17 09:00:25', 25);
@ -97,4 +93,33 @@ sql flush database reg_db0;
sql delete from st1 where ts<='2018-9-17 09:00:26';
sql select * from st1;
sql drop table t1
sql drop table t2
print =========================================>TD-22196
sql create table t1 using st1 tags(1);
$i = 0
$ts = 1674977959000
$rowNum = 200
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
sql insert into t1 values ( $ts , $x )
$x = $x + 1
$ts = $ts + 1000
endw
sql flush database $db
sql select min(c),max(c) from t1
if $data00 != 0 then
return -1
endi
if $data01 != 199 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,367 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists db1;
sql create database db1 vgroups 10;
sql use db1;
sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int);
sql create table tba1 using sta tags(1);
sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a");
sql insert into tba1 values ('2022-04-26 15:15:02', 2.0, "b");
sql insert into tba1 values ('2022-04-26 15:15:04', 4.0, "b");
sql insert into tba1 values ('2022-04-26 15:15:05', 5.0, "b");
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:00' and ts <= '2022-04-26 15:15:06' interval(1s) fill(value_f, 8.8);
if $rows != 7 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != 8.800000000 then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:00' and ts <= '2022-04-26 15:15:06' interval(1s) fill(value, 8.8);
if $rows != 7 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != 8.800000000 then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:00' and ts <= '2022-04-26 15:15:06' interval(1s) fill(null);
if $rows != 7 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != NULL then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:00' and ts <= '2022-04-26 15:15:06' interval(1s) fill(null_f);
if $rows != 7 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != NULL then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:06' and ts <= '2022-04-26 15:15:10' interval(1s) fill(value, 8.8);
if $rows != 0 then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:06' and ts <= '2022-04-26 15:15:10' interval(1s) fill(value_f, 8.8);
if $rows != 5 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 8.800000000 then
return -1
endi
if $data20 != 8.800000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 8.800000000 then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:06' and ts <= '2022-04-26 15:15:10' interval(1s) fill(null);
if $rows != 0 then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:15:06' and ts <= '2022-04-26 15:15:10' interval(1s) fill(null_f);
if $rows != 5 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != NULL then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != NULL then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:16:00' and ts <= '2022-04-26 19:15:59' interval(1s) fill(value_f, 8.8);
if $rows != 14400 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
sql select avg(f1) from tba1 where ts >= '2022-04-26 15:16:00' and ts <= '2022-04-26 19:15:59' interval(1s) fill(null_f);
if $rows != 14400 then
return -1
endi
if $data00 != NULL then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:00','2022-04-26 15:15:06') every(1s) fill(value_f, 8.8);
if $rows != 7 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != 8.800000000 then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:00','2022-04-26 15:15:06') every(1s) fill(value, 8.8);
if $rows != 7 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != 8.800000000 then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:00','2022-04-26 15:15:06') every(1s) fill(null);
if $rows != 7 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != NULL then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:00','2022-04-26 15:15:06') every(1s) fill(null_f);
if $rows != 7 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != 1.000000000 then
return -1
endi
if $data20 != 2.000000000 then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != 4.000000000 then
return -1
endi
if $data50 != 5.000000000 then
return -1
endi
if $data60 != NULL then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:06','2022-04-26 15:15:10') every(1s) fill(value, 8.8);
if $rows != 5 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 8.800000000 then
return -1
endi
if $data20 != 8.800000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 8.800000000 then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:06','2022-04-26 15:15:10') every(1s) fill(value_f, 8.8);
if $rows != 5 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
if $data10 != 8.800000000 then
return -1
endi
if $data20 != 8.800000000 then
return -1
endi
if $data30 != 8.800000000 then
return -1
endi
if $data40 != 8.800000000 then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:06','2022-04-26 15:15:10') every(1s) fill(null);
if $rows != 5 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != NULL then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != NULL then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:15:06','2022-04-26 15:15:10') every(1s) fill(null_f);
if $rows != 5 then
return -1
endi
if $data00 != NULL then
return -1
endi
if $data10 != NULL then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != NULL then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:16:00','2022-04-26 19:15:59') every(1s) fill(value_f, 8.8);
if $rows != 14400 then
return -1
endi
if $data00 != 8.800000000 then
return -1
endi
sql select interp(f1) from tba1 range('2022-04-26 15:16:00','2022-04-26 19:15:59') every(1s) fill(null_f);
if $rows != 14400 then
return -1
endi
if $data00 != NULL then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -14,6 +14,7 @@ sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));;
sql create stream streams1 trigger at_once into streamt as select _wstart ts, count(*) c1 from t1 where ts > 1648791210000 and ts < 1648791413000 interval(10s) fill(value, 100);
sql create stream streams1a trigger at_once into streamta as select _wstart ts, count(*) c1 from t1 where ts > 1648791210000 and ts < 1648791413000 interval(10s) fill(value_f, 100);
sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa');
sleep 100
sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa');
@ -77,6 +78,69 @@ if $data71 != 1 then
goto loop0
endi
print "force fill vaule"
$loop_count = 0
loop0a:
sleep 200
sql select * from streamta order by ts;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 8 then
print =====rows=$rows
goto loop0a
endi
if $data01 != 1 then
print =====data01=$data01
goto loop0a
endi
if $data11 != 1 then
print =====data11=$data11
goto loop0a
endi
if $data21 != 1 then
print =====data21=$data21
goto loop0a
endi
if $data31 != 100 then
print =====data31=$data31
goto loop0a
endi
if $data41 != 1 then
print =====data41=$data41
goto loop0a
endi
if $data51 != 100 then
print =====data01=$data01
goto loop0a
endi
if $data61 != 100 then
print =====data61=$data61
goto loop0a
endi
if $data71 != 1 then
print =====data71=$data71
goto loop0a
endi
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 1;
@ -408,6 +472,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1, concat(tbname, 'aaa') as pname, timezone() from st where ts > 1648791000000 and ts < 1648793000000 partition by tbname interval(10s) fill(NULL);
sql create stream streams4a trigger at_once into streamt4a as select _wstart ts, count(*) c1, concat(tbname, 'aaa') as pname, timezone() from st where ts > 1648791000000 and ts < 1648793000000 partition by tbname interval(10s) fill(NULL_F);
sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa');
sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa');
sql insert into t1 values(1648791273000,1,2,3,1.0,'aaa');
@ -512,32 +577,104 @@ if $data[12][3] == NULL then
goto loop4
endi
print "force fill null"
$loop_count = 0
loop4a:
sleep 200
sql select * from streamt4a order by pname, ts;
print ===> $data[0][0] , $data[0][1] , $data[0][2] , $data[0][3]
print ===> $data[1][0] , $data[1][1] , $data[1][2] , $data[1][3]
print ===> $data[2][0] , $data[2][1] , $data[2][2] , $data[2][3]
print ===> $data[3][0] , $data[3][1] , $data[3][2] , $data[3][3]
print ===> $data[4][0] , $data[4][1] , $data[4][2] , $data[4][3]
print ===> $data[5][0] , $data[5][1] , $data[5][2] , $data[5][3]
print ===> $data[6][0] , $data[6][1] , $data[6][2] , $data[6][3]
print ===> $data[7][0] , $data[7][1] , $data[7][2] , $data[7][3]
print ===> $data[8][0] , $data[8][1] , $data[8][2] , $data[8][3]
print ===> $data[9][0] , $data[9][1] , $data[9][2] , $data[9][3]
print ===> $data[10][0] , $data[10][1] , $data[10][2] , $data[10][3]
print ===> $data[11][0] , $data[11][1] , $data[11][2] , $data[11][3]
print ===> $data[12][0] , $data[12][1] , $data[12][2] , $data[12][3]
print ===> $data[13][0] , $data[13][1] , $data[13][2] , $data[13][3]
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 14 then
print =====rows=$rows
goto loop4a
endi
if $data11 != NULL then
print =====data11=$data11
goto loop4a
endi
if $data12 != t1aaa then
print =====data12=$data12
goto loop4a
endi
if $data13 == NULL then
print =====data13=$data13
goto loop4a
endi
if $data32 != t1aaa then
print =====data32=$data32
goto loop4a
endi
if $data42 != t1aaa then
print =====data42=$data42
goto loop4a
endi
if $data52 != t1aaa then
print =====data52=$data52
goto loop4a
endi
if $data81 != NULL then
print =====data81=$data81
goto loop4a
endi
if $data82 != t2aaa then
print =====data82=$data82
goto loop4a
endi
if $data83 == NULL then
print =====data83=$data83
goto loop4a
endi
if $data[10][2] != t2aaa then
print =====data[10][2]=$data[10][2]
goto loop4a
endi
if $data[11][2] != t2aaa then
print =====data[11][2]=$data[11][2]
goto loop4a
endi
if $data[12][2] != t2aaa then
print =====data[12][2]=$data[12][2]
goto loop4a
endi
if $data[12][3] == NULL then
print =====data[12][3]=$data[12][3]
goto loop4a
endi
@ -584,4 +721,4 @@ print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
#goto looptest