Merge pull request #24380 from taosdata/fix/TD-28134-3.0
fix: possible race condition
This commit is contained in:
commit
a518cba133
|
@ -397,6 +397,7 @@ struct STbData {
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
TSKEY minKey;
|
TSKEY minKey;
|
||||||
TSKEY maxKey;
|
TSKEY maxKey;
|
||||||
|
SRWLatch lock;
|
||||||
SDelData *pHead;
|
SDelData *pHead;
|
||||||
SDelData *pTail;
|
SDelData *pTail;
|
||||||
SMemSkipList sl;
|
SMemSkipList sl;
|
||||||
|
|
|
@ -181,6 +181,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
pDelData->sKey = sKey;
|
pDelData->sKey = sKey;
|
||||||
pDelData->eKey = eKey;
|
pDelData->eKey = eKey;
|
||||||
pDelData->pNext = NULL;
|
pDelData->pNext = NULL;
|
||||||
|
taosWLockLatch(&pTbData->lock);
|
||||||
if (pTbData->pHead == NULL) {
|
if (pTbData->pHead == NULL) {
|
||||||
ASSERT(pTbData->pTail == NULL);
|
ASSERT(pTbData->pTail == NULL);
|
||||||
pTbData->pHead = pTbData->pTail = pDelData;
|
pTbData->pHead = pTbData->pTail = pDelData;
|
||||||
|
@ -188,6 +189,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
pTbData->pTail->pNext = pDelData;
|
pTbData->pTail->pNext = pDelData;
|
||||||
pTbData->pTail = pDelData;
|
pTbData->pTail = pDelData;
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTbData->lock);
|
||||||
|
|
||||||
pMemTable->nDel++;
|
pMemTable->nDel++;
|
||||||
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
||||||
|
@ -401,6 +403,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
||||||
SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
|
SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
|
||||||
SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
|
SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
|
||||||
}
|
}
|
||||||
|
taosInitRWLatch(&pTbData->lock);
|
||||||
|
|
||||||
taosWLockLatch(&pMemTable->latch);
|
taosWLockLatch(&pMemTable->latch);
|
||||||
|
|
||||||
|
|
|
@ -22,9 +22,14 @@
|
||||||
#include "tsdbUtil2.h"
|
#include "tsdbUtil2.h"
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
|
|
||||||
#define INIT_TIMEWINDOW(_w) do { (_w)->skey = INT64_MAX; (_w)->ekey = INT64_MIN;} while(0);
|
#define INIT_TIMEWINDOW(_w) \
|
||||||
|
do { \
|
||||||
|
(_w)->skey = INT64_MAX; \
|
||||||
|
(_w)->ekey = INT64_MIN; \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
|
||||||
|
int32_t order);
|
||||||
|
|
||||||
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||||
int32_t num = numOfTables / pBuf->numPerBucket;
|
int32_t num = numOfTables / pBuf->numPerBucket;
|
||||||
|
@ -449,7 +454,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
||||||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||||
|
|
||||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||||
STableBlockScanInfo *pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
|
STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
|
||||||
if (pTableScanInfo->pBlockIdxList == NULL) {
|
if (pTableScanInfo->pBlockIdxList == NULL) {
|
||||||
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
|
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
|
||||||
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
|
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
|
||||||
|
@ -504,7 +509,7 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STombRecord record = {0};
|
STombRecord record = {0};
|
||||||
|
|
||||||
uint64_t uid = pReader->status.uidList.tableUidList[*j];
|
uint64_t uid = pReader->status.uidList.tableUidList[*j];
|
||||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||||
if (pScanInfo->pFileDelData == NULL) {
|
if (pScanInfo->pFileDelData == NULL) {
|
||||||
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||||
|
@ -595,12 +600,12 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// uint64_t uid = pReader->status.uidList.tableUidList[j];
|
// uint64_t uid = pReader->status.uidList.tableUidList[j];
|
||||||
|
|
||||||
// STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
// STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||||
// if (pScanInfo->pFileDelData == NULL) {
|
// if (pScanInfo->pFileDelData == NULL) {
|
||||||
// pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
// pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||||
// }
|
// }
|
||||||
|
|
||||||
ETombBlkCheckEnum ret = 0;
|
ETombBlkCheckEnum ret = 0;
|
||||||
code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
|
code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
|
||||||
|
@ -650,6 +655,7 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
|
||||||
|
|
||||||
SDelData* p = NULL;
|
SDelData* p = NULL;
|
||||||
if (pMemTbData != NULL) {
|
if (pMemTbData != NULL) {
|
||||||
|
taosRLockLatch(&pMemTbData->lock);
|
||||||
p = pMemTbData->pHead;
|
p = pMemTbData->pHead;
|
||||||
while (p) {
|
while (p) {
|
||||||
if (p->version <= ver) {
|
if (p->version <= ver) {
|
||||||
|
@ -658,6 +664,7 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
|
||||||
|
|
||||||
p = p->pNext;
|
p = p->pNext;
|
||||||
}
|
}
|
||||||
|
taosRUnLockLatch(&pMemTbData->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (piMemTbData != NULL) {
|
if (piMemTbData != NULL) {
|
||||||
|
@ -681,7 +688,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].maxTbid.suid < suid)) {
|
while ((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].maxTbid.suid < suid)) {
|
||||||
++i;
|
++i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -689,7 +696,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
SStatisBlk* p = &pStatisBlkArray->data[i];
|
||||||
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||||
tStatisBlockInit(pStatisBlock);
|
tStatisBlockInit(pStatisBlock);
|
||||||
|
|
||||||
|
@ -759,14 +766,14 @@ void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
|
||||||
if (size < numOfFileObj) {
|
if (size < numOfFileObj) {
|
||||||
int32_t inc = numOfFileObj - size;
|
int32_t inc = numOfFileObj - size;
|
||||||
for (int32_t k = 0; k < inc; ++k) {
|
for (int32_t k = 0; k < inc; ++k) {
|
||||||
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
taosArrayPush(pLDIterList, &pIter);
|
taosArrayPush(pLDIterList, &pIter);
|
||||||
}
|
}
|
||||||
} else if (size > numOfFileObj) { // remove unused LDataIter
|
} else if (size > numOfFileObj) { // remove unused LDataIter
|
||||||
int32_t inc = size - numOfFileObj;
|
int32_t inc = size - numOfFileObj;
|
||||||
|
|
||||||
for (int i = 0; i < inc; ++i) {
|
for (int i = 0; i < inc; ++i) {
|
||||||
SLDataIter *pIter = taosArrayPop(pLDIterList);
|
SLDataIter* pIter = taosArrayPop(pLDIterList);
|
||||||
destroyLDataIter(pIter);
|
destroyLDataIter(pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -781,9 +788,9 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet)
|
||||||
taosArrayPush(pSttFileBlockIterArray, &pList);
|
taosArrayPush(pSttFileBlockIterArray, &pList);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t j = 0; j < numOfLevels; ++j) {
|
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||||
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
|
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
|
||||||
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
|
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
|
||||||
doAdjustValidDataIters(pList, TARRAY2_SIZE(pSttLevel->fobjArr));
|
doAdjustValidDataIters(pList, TARRAY2_SIZE(pSttLevel->fobjArr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -829,8 +836,8 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
||||||
}
|
}
|
||||||
|
|
||||||
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||||
TStatisBlkArray *pStatisBlkArray = NULL;
|
TStatisBlkArray* pStatisBlkArray = NULL;
|
||||||
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
|
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray**)&pStatisBlkArray);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
|
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
|
||||||
continue;
|
continue;
|
||||||
|
@ -899,11 +906,12 @@ static int32_t sortUidComparFn(const void* p1, const void* p2) {
|
||||||
if (px1->skey == px2->skey) {
|
if (px1->skey == px2->skey) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return px1->skey < px2->skey? -1:1;
|
return px1->skey < px2->skey ? -1 : 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo *pScanInfo, int32_t order) {
|
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo,
|
||||||
|
int32_t order) {
|
||||||
// check if it overlap with del skyline
|
// check if it overlap with del skyline
|
||||||
taosArraySort(pTimewindowList, sortUidComparFn);
|
taosArraySort(pTimewindowList, sortUidComparFn);
|
||||||
|
|
||||||
|
@ -935,7 +943,7 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableB
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
|
static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
|
||||||
int32_t startIndex) {
|
int32_t startIndex) {
|
||||||
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
||||||
|
|
||||||
for (int32_t i = startIndex; i < num; i += 1) {
|
for (int32_t i = startIndex; i < num; i += 1) {
|
||||||
|
|
Loading…
Reference in New Issue