Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-tmq
This commit is contained in:
commit
e3acb10acd
|
@ -1332,6 +1332,8 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
|
if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
|
||||||
|
uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock);
|
||||||
|
uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock);
|
||||||
taosMemoryFreeClear(pBlock->info.pks[0].pData);
|
taosMemoryFreeClear(pBlock->info.pks[0].pData);
|
||||||
taosMemoryFreeClear(pBlock->info.pks[1].pData);
|
taosMemoryFreeClear(pBlock->info.pks[1].pData);
|
||||||
}
|
}
|
||||||
|
@ -1483,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
|
|
||||||
SSDataBlock* pBlock = createDataBlock();
|
SSDataBlock* pBlock = createDataBlock();
|
||||||
pBlock->info = pDataBlock->info;
|
pBlock->info = pDataBlock->info;
|
||||||
|
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
pBlock->info.capacity = 0;
|
pBlock->info.capacity = 0;
|
||||||
pBlock->info.rowSize = 0;
|
pBlock->info.rowSize = 0;
|
||||||
|
@ -1503,11 +1506,17 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
pVal->type = pDataBlock->info.pks[0].type;
|
pVal->type = pDataBlock->info.pks[0].type;
|
||||||
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
|
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
|
||||||
pVal->nData = pDataBlock->info.pks[0].nData;
|
pVal->nData = pDataBlock->info.pks[0].nData;
|
||||||
|
memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData);
|
||||||
|
|
||||||
pVal = &pBlock->info.pks[1];
|
SValue* p = &pBlock->info.pks[1];
|
||||||
pVal->type = pDataBlock->info.pks[1].type;
|
p->type = pDataBlock->info.pks[1].type;
|
||||||
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
|
p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
|
||||||
pVal->nData = pDataBlock->info.pks[1].nData;
|
p->nData = pDataBlock->info.pks[1].nData;
|
||||||
|
memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData);
|
||||||
|
uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData);
|
||||||
|
uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData);
|
||||||
|
} else {
|
||||||
|
uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (copyData) {
|
if (copyData) {
|
||||||
|
|
|
@ -119,7 +119,7 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
|
||||||
return ret > 0 ? 1 : -1;
|
return ret > 0 ? 1 : -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
|
return p1->pks[0].val - p2->pks[0].val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -179,9 +179,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(indices[i].type)) {
|
if (IS_VAR_DATA_TYPE(indices[i].type)) {
|
||||||
tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData);
|
tdata += tGetU32v(tdata, &pKey->pks[i].nData);
|
||||||
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
|
memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
|
||||||
pKey->pks[i].pData += pKey->pks[i].nData;
|
|
||||||
} else {
|
} else {
|
||||||
memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes);
|
memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes);
|
||||||
}
|
}
|
||||||
|
@ -396,14 +395,18 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) {
|
bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) {
|
||||||
|
return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type);
|
||||||
|
}
|
||||||
|
|
||||||
|
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) {
|
||||||
pIter->order = order;
|
pIter->order = order;
|
||||||
pIter->index = -1;
|
pIter->index = -1;
|
||||||
pIter->numOfBlocks = 0;
|
pIter->numOfBlocks = 0;
|
||||||
if (pIter->blockList == NULL) {
|
if (pIter->blockList == NULL) {
|
||||||
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
|
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
|
||||||
} else {
|
} else {
|
||||||
clearDataBlockIterator(pIter, hasPk);
|
clearDataBlockIterator(pIter, needFree);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -821,18 +824,13 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo keep the the last returned key
|
|
||||||
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
|
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
|
||||||
// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
|
||||||
pDumpInfo->allDumped = true;
|
pDumpInfo->allDumped = true;
|
||||||
// ASSERT(0);
|
|
||||||
// pDumpInfo->lastKey.key.ts = maxKey + step;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks,
|
static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks,
|
||||||
bool asc) {
|
bool asc) {
|
||||||
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
|
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||||
|
|
||||||
pKey->numOfPKs = numOfPks;
|
pKey->numOfPKs = numOfPks;
|
||||||
if (pKey->numOfPKs <= 0) {
|
if (pKey->numOfPKs <= 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -842,7 +840,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa
|
||||||
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
|
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
|
||||||
} else {
|
} else {
|
||||||
uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
|
uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
|
||||||
pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen;
|
pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData);
|
||||||
memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
|
memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2838,11 +2836,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
|
||||||
pInfo->pks[0].val = pBlockInfo->firstPk.val;
|
pInfo->pks[0].val = pBlockInfo->firstPk.val;
|
||||||
pInfo->pks[1].val = pBlockInfo->lastPk.val;
|
pInfo->pks[1].val = pBlockInfo->lastPk.val;
|
||||||
} else {
|
} else {
|
||||||
memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen);
|
memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData));
|
||||||
memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen);
|
memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData));
|
||||||
|
|
||||||
pInfo->pks[0].nData = pBlockInfo->firstPKLen;
|
pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData);
|
||||||
pInfo->pks[1].nData = pBlockInfo->lastPKLen;
|
pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3203,7 +3201,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
|
||||||
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
|
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
|
||||||
} else { // no block data, only last block exists
|
} else { // no block data, only last block exists
|
||||||
tBlockDataReset(&pReader->status.fileBlockData);
|
tBlockDataReset(&pReader->status.fileBlockData);
|
||||||
resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
|
resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3313,7 +3311,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tBlockDataReset(pBlockData);
|
tBlockDataReset(pBlockData);
|
||||||
resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
|
resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
|
|
||||||
ERetrieveType type = doReadDataFromSttFiles(pReader);
|
ERetrieveType type = doReadDataFromSttFiles(pReader);
|
||||||
|
@ -4158,7 +4156,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
|
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
|
||||||
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
|
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pStatus->fileIter.numOfFiles == 0) {
|
if (pStatus->fileIter.numOfFiles == 0) {
|
||||||
|
@ -4362,7 +4360,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
||||||
|
|
||||||
taosMemoryFree(pSupInfo->colId);
|
taosMemoryFree(pSupInfo->colId);
|
||||||
tBlockDataDestroy(&pReader->status.fileBlockData);
|
tBlockDataDestroy(&pReader->status.fileBlockData);
|
||||||
cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0);
|
cleanupDataBlockIterator(&pReader->status.blockIter, shouldFreePkBuf(&pReader->suppInfo));
|
||||||
|
|
||||||
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||||
if (pReader->status.pTableMap != NULL) {
|
if (pReader->status.pTableMap != NULL) {
|
||||||
|
@ -5038,7 +5036,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
|
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
|
||||||
|
|
||||||
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
|
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
|
||||||
resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
|
resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
|
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
|
@ -446,17 +446,17 @@ void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
|
||||||
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
|
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
|
||||||
pBlockInfo->firstPk.val = pFirstKey->pks[0].val;
|
pBlockInfo->firstPk.val = pFirstKey->pks[0].val;
|
||||||
pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val;
|
pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val;
|
||||||
|
} else {
|
||||||
|
char* p = taosMemoryCalloc(1, pFirstKey->pks[0].nData + VARSTR_HEADER_SIZE);
|
||||||
|
memcpy(varDataVal(p), pFirstKey->pks[0].pData, pFirstKey->pks[0].nData);
|
||||||
|
varDataSetLen(p, pFirstKey->pks[0].nData);
|
||||||
|
pBlockInfo->firstPk.pData = (uint8_t*)p;
|
||||||
|
|
||||||
pBlockInfo->firstPKLen = 0;
|
int32_t keyLen = record->lastKey.key.pks[0].nData;
|
||||||
pBlockInfo->lastPKLen = 0;
|
p = taosMemoryCalloc(1, keyLen + VARSTR_HEADER_SIZE);
|
||||||
} else { // todo handle memory alloc error, opt memory alloc perf
|
memcpy(varDataVal(p), record->lastKey.key.pks[0].pData, keyLen);
|
||||||
pBlockInfo->firstPKLen = pFirstKey->pks[0].nData;
|
varDataSetLen(p, keyLen);
|
||||||
pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen);
|
pBlockInfo->lastPk.pData = (uint8_t*)p;
|
||||||
memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen);
|
|
||||||
|
|
||||||
pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData;
|
|
||||||
pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen);
|
|
||||||
memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -467,21 +467,21 @@ static void freePkItem(void* pItem) {
|
||||||
taosMemoryFreeClear(p->lastPk.pData);
|
taosMemoryFreeClear(p->lastPk.pData);
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) {
|
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) {
|
||||||
pIter->index = -1;
|
pIter->index = -1;
|
||||||
pIter->numOfBlocks = 0;
|
pIter->numOfBlocks = 0;
|
||||||
|
|
||||||
if (hasPk) {
|
if (needFree) {
|
||||||
taosArrayClearEx(pIter->blockList, freePkItem);
|
taosArrayClearEx(pIter->blockList, freePkItem);
|
||||||
} else {
|
} else {
|
||||||
taosArrayClear(pIter->blockList);
|
taosArrayClear(pIter->blockList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) {
|
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) {
|
||||||
pIter->index = -1;
|
pIter->index = -1;
|
||||||
pIter->numOfBlocks = 0;
|
pIter->numOfBlocks = 0;
|
||||||
if (hasPk) {
|
if (needFree) {
|
||||||
taosArrayDestroyEx(pIter->blockList, freePkItem);
|
taosArrayDestroyEx(pIter->blockList, freePkItem);
|
||||||
} else {
|
} else {
|
||||||
taosArrayDestroy(pIter->blockList);
|
taosArrayDestroy(pIter->blockList);
|
||||||
|
@ -492,7 +492,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
SBlockOrderSupporter sup = {0};
|
SBlockOrderSupporter sup = {0};
|
||||||
clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0);
|
clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo));
|
||||||
|
|
||||||
pBlockIter->numOfBlocks = numOfBlocks;
|
pBlockIter->numOfBlocks = numOfBlocks;
|
||||||
|
|
||||||
|
|
|
@ -212,8 +212,6 @@ typedef struct SFileDataBlockInfo {
|
||||||
uint8_t* pData;
|
uint8_t* pData;
|
||||||
} lastPk;
|
} lastPk;
|
||||||
|
|
||||||
int32_t firstPKLen;
|
|
||||||
int32_t lastPKLen;
|
|
||||||
int64_t minVer;
|
int64_t minVer;
|
||||||
int64_t maxVer;
|
int64_t maxVer;
|
||||||
int64_t blockOffset;
|
int64_t blockOffset;
|
||||||
|
@ -349,8 +347,9 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
|
||||||
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
|
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
|
||||||
void clearRowKey(SRowKey* pKey);
|
void clearRowKey(SRowKey* pKey);
|
||||||
|
|
||||||
|
bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp);
|
||||||
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
|
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
|
||||||
void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
|
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
|
||||||
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
|
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo {
|
||||||
int32_t scanTimes;
|
int32_t scanTimes;
|
||||||
int32_t readIdx;
|
int32_t readIdx;
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
SSHashObj* mTableNumRows; // uid->num of table rows
|
SSHashObj* mTableNumRows; // uid->num of table rows
|
||||||
SHashObj* mSkipTables;
|
SHashObj* mSkipTables;
|
||||||
int64_t mergeLimit;
|
int64_t mergeLimit;
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
bool needCountEmptyTable;
|
bool needCountEmptyTable;
|
||||||
bool bGroupProcessed; // the group return data means processed
|
bool bGroupProcessed; // the group return data means processed
|
||||||
bool filesetDelimited;
|
bool filesetDelimited;
|
||||||
bool bNewFilesetEvent;
|
bool bNewFilesetEvent;
|
||||||
bool bNextDurationBlockEvent;
|
bool bNextDurationBlockEvent;
|
||||||
int32_t numNextDurationBlocks;
|
int32_t numNextDurationBlocks;
|
||||||
SSDataBlock* nextDurationBlocks[2];
|
SSDataBlock* nextDurationBlocks[2];
|
||||||
bool rtnNextDurationBlocks;
|
bool rtnNextDurationBlocks;
|
||||||
int32_t nextDurationBlocksIdx;
|
int32_t nextDurationBlocksIdx;
|
||||||
|
bool bSortRowId;
|
||||||
bool bSortRowId;
|
|
||||||
|
|
||||||
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
|
@ -261,6 +261,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
|
||||||
pBlockInfo->pks[0].type = pInfoData->info.type;
|
pBlockInfo->pks[0].type = pInfoData->info.type;
|
||||||
pBlockInfo->pks[1].type = pInfoData->info.type;
|
pBlockInfo->pks[1].type = pInfoData->info.type;
|
||||||
|
|
||||||
|
// allocate enough buffer size, which is pInfoData->info.bytes
|
||||||
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
|
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
|
||||||
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||||
if (pBlockInfo->pks[0].pData == NULL) {
|
if (pBlockInfo->pks[0].pData == NULL) {
|
||||||
|
|
|
@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
|
||||||
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
|
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
|
||||||
|
|
||||||
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
|
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
|
||||||
|
|
||||||
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
|
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
|
||||||
|
|
||||||
pInfo->pResBlock->info.blankFill = false;
|
pInfo->pResBlock->info.blankFill = false;
|
||||||
|
|
||||||
if (!pInfo->needCountEmptyTable) {
|
if (!pInfo->needCountEmptyTable) {
|
||||||
|
@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
STableKeyInfo* pList = NULL;
|
STableKeyInfo* pList = NULL;
|
||||||
|
|
||||||
if (pInfo->currentGroupId == -1) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||||
|
@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&pTaskInfo->lock);
|
||||||
initNextGroupScan(pInfo, &pList, &num);
|
initNextGroupScan(pInfo, &pList, &num);
|
||||||
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
|
||||||
ASSERT(pInfo->base.dataReader == NULL);
|
ASSERT(pInfo->base.dataReader == NULL);
|
||||||
|
|
||||||
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||||
|
@ -4134,14 +4135,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
SSDataBlock* pBlock = pInfo->pReaderBlock;
|
||||||
SSDataBlock* pBlock = pInfo->pReaderBlock;
|
int32_t code = 0;
|
||||||
int32_t code = 0;
|
bool hasNext = false;
|
||||||
bool hasNext = false;
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
|
||||||
|
|
||||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -4177,27 +4177,24 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
|
||||||
*pSkipped = true;
|
*pSkipped = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
STableMergeScanSortSourceParam* source = param;
|
STableMergeScanSortSourceParam* source = param;
|
||||||
SOperatorInfo* pOperator = source->pOperator;
|
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
|
||||||
|
|
||||||
SSDataBlock* pBlock = NULL;
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
int32_t code = 0;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
bool hasNext = false;
|
|
||||||
|
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (pInfo->rtnNextDurationBlocks) {
|
if (pInfo->rtnNextDurationBlocks) {
|
||||||
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
|
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
|
||||||
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
|
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
|
||||||
|
|
||||||
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
|
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
|
||||||
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
|
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
|
||||||
++pInfo->nextDurationBlocksIdx;
|
++pInfo->nextDurationBlocksIdx;
|
||||||
|
@ -4206,19 +4203,19 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
blockDataDestroy(pInfo->nextDurationBlocks[i]);
|
||||||
pInfo->nextDurationBlocks[i] = NULL;
|
pInfo->nextDurationBlocks[i] = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->rtnNextDurationBlocks = false;
|
pInfo->rtnNextDurationBlocks = false;
|
||||||
pInfo->nextDurationBlocksIdx = 0;
|
pInfo->nextDurationBlocksIdx = 0;
|
||||||
pInfo->numNextDurationBlocks = 0;
|
pInfo->numNextDurationBlocks = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
bool bFinished = false;
|
bool bFinished = false;
|
||||||
bool bSkipped = false;
|
bool bSkipped = false;
|
||||||
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
||||||
pBlock = pInfo->pReaderBlock;
|
pBlock = pInfo->pReaderBlock;
|
||||||
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
|
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
|
||||||
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
|
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
|
||||||
if (bFinished) {
|
if (bFinished) {
|
||||||
pInfo->bNewFilesetEvent = false;
|
pInfo->bNewFilesetEvent = false;
|
||||||
break;
|
break;
|
||||||
|
@ -4229,15 +4226,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
|
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
|
||||||
++pInfo->numNextDurationBlocks;
|
++pInfo->numNextDurationBlocks;
|
||||||
if (pInfo->numNextDurationBlocks > 2) {
|
if (pInfo->numNextDurationBlocks > 2) {
|
||||||
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks);
|
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo),
|
||||||
|
pInfo->numNextDurationBlocks);
|
||||||
pInfo->bNewFilesetEvent = false;
|
pInfo->bNewFilesetEvent = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->bNewFilesetEvent) {
|
if (pInfo->bNewFilesetEvent) {
|
||||||
pInfo->rtnNextDurationBlocks = true;
|
pInfo->rtnNextDurationBlocks = true;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->bNextDurationBlockEvent) {
|
if (pInfo->bNextDurationBlockEvent) {
|
||||||
pInfo->bNextDurationBlockEvent = false;
|
pInfo->bNextDurationBlockEvent = false;
|
||||||
continue;
|
continue;
|
||||||
|
@ -4245,19 +4245,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
}
|
}
|
||||||
if (bSkipped) continue;
|
if (bSkipped) continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
|
||||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) {
|
SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) {
|
||||||
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
SBlockOrderInfo biTs = {0};
|
SBlockOrderInfo biTs = {0};
|
||||||
|
|
|
@ -1339,11 +1339,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
||||||
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
|
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
|
||||||
doKeepTuple(pRowSup, tsList[j], gid);
|
doKeepTuple(pRowSup, tsList[j], gid);
|
||||||
} else { // start a new session window
|
} else { // start a new session window
|
||||||
|
// start a new session window
|
||||||
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
|
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
// keep the time window for the closed time window.
|
// keep the time window for the closed time window.
|
||||||
STimeWindow window = pRowSup->win;
|
STimeWindow window = pRowSup->win;
|
||||||
|
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
|
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
|
||||||
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||||
|
|
|
@ -1649,8 +1649,8 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
||||||
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
||||||
|
|
||||||
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
|
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
|
||||||
|
|
||||||
|
@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
|
|
||||||
if (pBlk != NULL) {
|
if (pBlk != NULL) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
|
||||||
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||||
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||||
(pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
(pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||||
if (bExtractedBlock) {
|
if (bExtractedBlock) {
|
||||||
blockDataDestroy(pBlk);
|
blockDataDestroy(pBlk);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1945,10 +1945,14 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
||||||
if (code == TSDB_CODE_SUCCESS && bFirstTable) {
|
if (code == TSDB_CODE_SUCCESS && bFirstTable) {
|
||||||
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
|
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
|
||||||
}
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
|
code =
|
||||||
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
|
insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
|
||||||
initTableColSubmitData(*ppTableDataCxt);
|
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
|
||||||
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
code = initTableColSubmitData(*ppTableDataCxt);
|
||||||
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
|
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
|
||||||
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
|
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
|
||||||
|
@ -1965,7 +1969,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
||||||
|
|
||||||
clearStbRowsDataContext(pStbRowsCxt);
|
clearStbRowsDataContext(pStbRowsCxt);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
|
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
|
||||||
|
|
|
@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
||||||
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||||
if (commitIndex >= ths->assignedCommitIndex) {
|
if (commitIndex >= ths->assignedCommitIndex) {
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
raftStoreNextTerm(ths);
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (syncNodeAssignedLeader2Leader(ths) != 0) {
|
|
||||||
sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(&ths->arbTokenMutex);
|
|
||||||
syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken);
|
|
||||||
sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken);
|
|
||||||
taosThreadMutexUnlock(&ths->arbTokenMutex);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
|
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
|
||||||
|
|
|
@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
sError("sync step down error");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
syncNodeStepDown(pSyncNode, newTerm);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
|
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
|
|
||||||
// start in syncNodeStart
|
// start in syncNodeStart
|
||||||
// start raft
|
// start raft
|
||||||
// syncNodeBecomeFollower(pSyncNode);
|
|
||||||
|
|
||||||
int64_t timeNow = taosGetTimestampMs();
|
int64_t timeNow = taosGetTimestampMs();
|
||||||
pSyncNode->startTime = timeNow;
|
pSyncNode->startTime = timeNow;
|
||||||
|
@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
||||||
|
|
||||||
// persist cfg
|
// persist cfg
|
||||||
syncWriteCfgFile(pSyncNode);
|
syncWriteCfgFile(pSyncNode);
|
||||||
|
|
||||||
#if 0
|
|
||||||
// change isStandBy to normal (election timeout)
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
|
||||||
syncNodeBecomeLeader(pSyncNode, "");
|
|
||||||
|
|
||||||
// Raft 3.6.2 Committing entries from previous terms
|
|
||||||
syncNodeAppendNoop(pSyncNode);
|
|
||||||
// syncMaybeAdvanceCommitIndex(pSyncNode);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
syncNodeBecomeFollower(pSyncNode, "");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
} else {
|
} else {
|
||||||
// persist cfg
|
// persist cfg
|
||||||
syncWriteCfgFile(pSyncNode);
|
syncWriteCfgFile(pSyncNode);
|
||||||
|
@ -1874,18 +1845,6 @@ _END:
|
||||||
}
|
}
|
||||||
|
|
||||||
// raft state change --------------
|
// raft state change --------------
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
|
||||||
if (term > raftStoreGetTerm(pSyncNode)) {
|
|
||||||
raftStoreSetTerm(pSyncNode, term);
|
|
||||||
char tmpBuf[64];
|
|
||||||
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
|
|
||||||
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
|
||||||
raftStoreClearVote(pSyncNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
||||||
if (term > raftStoreGetTerm(pSyncNode)) {
|
if (term > raftStoreGetTerm(pSyncNode)) {
|
||||||
raftStoreSetTerm(pSyncNode, term);
|
raftStoreSetTerm(pSyncNode, term);
|
||||||
|
@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
||||||
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||||
|
taosThreadMutexLock(&pSyncNode->arbTokenMutex);
|
||||||
|
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
|
||||||
|
sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
|
||||||
|
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
|
||||||
|
}
|
||||||
|
|
||||||
if (currentTerm < newTerm) {
|
if (currentTerm < newTerm) {
|
||||||
raftStoreSetTerm(pSyncNode, newTerm);
|
raftStoreSetTerm(pSyncNode, newTerm);
|
||||||
char tmpBuf[64];
|
char tmpBuf[64];
|
||||||
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
|
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
|
||||||
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
||||||
raftStoreClearVote(pSyncNode);
|
raftStoreClearVote(pSyncNode);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
syncNodeBecomeFollower(pSyncNode, "step down");
|
syncNodeBecomeFollower(pSyncNode, "step down");
|
||||||
|
@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||||
sNTrace(pSyncNode, "follower to candidate");
|
sNTrace(pSyncNode, "follower to candidate");
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
|
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
|
|
||||||
syncNodeBecomeFollower(pSyncNode, "leader to follower");
|
|
||||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
|
||||||
sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
|
||||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
|
||||||
|
|
||||||
sNTrace(pSyncNode, "leader to follower");
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
|
||||||
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
|
|
||||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
|
||||||
sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
|
||||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
|
||||||
|
|
||||||
sNTrace(pSyncNode, "candidate to follower");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
|
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
|
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
|
||||||
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
|
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
|
||||||
|
|
Loading…
Reference in New Issue