Merge pull request #21111 from taosdata/fix/liaohj_main
enh(query): stop tsdb reader ASAP. TD-23249
This commit is contained in:
commit
ad7c3218f7
|
@ -92,7 +92,6 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*assert(connectRsp.epSet.numOfEps > 0);*/
|
|
||||||
if (connectRsp.epSet.numOfEps == 0) {
|
if (connectRsp.epSet.numOfEps == 0) {
|
||||||
setErrno(pRequest, TSDB_CODE_APP_ERROR);
|
setErrno(pRequest, TSDB_CODE_APP_ERROR);
|
||||||
tsem_post(&pRequest->body.rspSem);
|
tsem_post(&pRequest->body.rspSem);
|
||||||
|
|
|
@ -982,7 +982,6 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
|
|
||||||
if (pCur->vgroupIndex != -1) {
|
if (pCur->vgroupIndex != -1) {
|
||||||
tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
|
tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,11 +175,8 @@ typedef struct STsdbReader STsdbReader;
|
||||||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||||
|
|
||||||
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
|
||||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||||
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
|
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
|
||||||
|
|
||||||
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||||
|
@ -190,7 +187,10 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
|
||||||
|
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||||
|
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
||||||
|
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||||
|
|
||||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||||
uint64_t suid, void **pReader, const char *idstr);
|
uint64_t suid, void **pReader, const char *idstr);
|
||||||
|
@ -232,25 +232,20 @@ typedef struct SSnapContext {
|
||||||
|
|
||||||
typedef struct STqReader {
|
typedef struct STqReader {
|
||||||
SPackedData msg2;
|
SPackedData msg2;
|
||||||
|
|
||||||
SSubmitReq2 submit;
|
SSubmitReq2 submit;
|
||||||
int32_t nextBlk;
|
int32_t nextBlk;
|
||||||
|
|
||||||
int64_t lastBlkUid;
|
int64_t lastBlkUid;
|
||||||
|
|
||||||
SWalReader *pWalReader;
|
SWalReader *pWalReader;
|
||||||
|
|
||||||
SMeta *pVnodeMeta;
|
SMeta *pVnodeMeta;
|
||||||
SHashObj *tbIdHash;
|
SHashObj *tbIdHash;
|
||||||
SArray *pColIdList; // SArray<int16_t>
|
SArray *pColIdList; // SArray<int16_t>
|
||||||
|
|
||||||
int32_t cachedSchemaVer;
|
int32_t cachedSchemaVer;
|
||||||
int64_t cachedSchemaSuid;
|
int64_t cachedSchemaSuid;
|
||||||
SSchemaWrapper *pSchemaWrapper;
|
SSchemaWrapper *pSchemaWrapper;
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
} STqReader;
|
} STqReader;
|
||||||
|
|
||||||
STqReader *tqOpenReader(SVnode *pVnode);
|
STqReader *tqReaderOpen(SVnode *pVnode);
|
||||||
void tqCloseReader(STqReader *);
|
void tqCloseReader(STqReader *);
|
||||||
|
|
||||||
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||||
|
@ -266,7 +261,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i
|
||||||
bool tqNextBlockImpl(STqReader *pReader);
|
bool tqNextBlockImpl(STqReader *pReader);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
||||||
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
||||||
|
|
||||||
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
|
|
@ -504,7 +504,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.pTqReader = tqOpenReader(pVnode);
|
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||||
|
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
@ -523,7 +523,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||||
}
|
}
|
||||||
pHandle->execHandle.pTqReader = tqOpenReader(pVnode);
|
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
|
|
|
@ -328,7 +328,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
}
|
}
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode);
|
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
||||||
(SSnapContext**)(&reader.sContext));
|
(SSnapContext**)(&reader.sContext));
|
||||||
|
@ -343,7 +343,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||||
}
|
}
|
||||||
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode);
|
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
|
|
|
@ -249,7 +249,7 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqReader* tqOpenReader(SVnode* pVnode) {
|
STqReader* tqReaderOpen(SVnode* pVnode) {
|
||||||
STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
|
STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
|
||||||
if (pReader == NULL) {
|
if (pReader == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -653,7 +653,7 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
|
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
|
||||||
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
|
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
|
|
|
@ -207,7 +207,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
||||||
taosArrayClear(pBlocks);
|
taosArrayClear(pBlocks);
|
||||||
taosArrayClear(pSchemas);
|
taosArrayClear(pSchemas);
|
||||||
SSubmitTbData* pSubmitTbDataRet = NULL;
|
SSubmitTbData* pSubmitTbDataRet = NULL;
|
||||||
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
|
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
|
@ -266,7 +266,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
||||||
taosArrayClear(pBlocks);
|
taosArrayClear(pBlocks);
|
||||||
taosArrayClear(pSchemas);
|
taosArrayClear(pSchemas);
|
||||||
SSubmitTbData* pSubmitTbDataRet = NULL;
|
SSubmitTbData* pSubmitTbDataRet = NULL;
|
||||||
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
|
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
|
|
|
@ -20,6 +20,12 @@
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
|
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
READER_STATUS_SUSPEND = 0x1,
|
||||||
|
READER_STATUS_SHOULD_STOP = 0x2,
|
||||||
|
READER_STATUS_NORMAL = 0x3,
|
||||||
|
} EReaderExecStatus;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
EXTERNAL_ROWS_PREV = 0x1,
|
EXTERNAL_ROWS_PREV = 0x1,
|
||||||
EXTERNAL_ROWS_MAIN = 0x2,
|
EXTERNAL_ROWS_MAIN = 0x2,
|
||||||
|
@ -180,19 +186,23 @@ typedef struct STsdbReaderAttr {
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
} STsdbReaderAttr;
|
} STsdbReaderAttr;
|
||||||
|
|
||||||
|
typedef struct SResultBlockInfo {
|
||||||
|
SSDataBlock* pResBlock;
|
||||||
|
bool freeBlock;
|
||||||
|
int64_t capacity;
|
||||||
|
} SResultBlockInfo;
|
||||||
|
|
||||||
struct STsdbReader {
|
struct STsdbReader {
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
TdThreadMutex readerMutex;
|
TdThreadMutex readerMutex;
|
||||||
bool suspended;
|
EReaderExecStatus flag;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int16_t order;
|
int16_t order;
|
||||||
bool freeBlock;
|
|
||||||
EReadMode readMode;
|
EReadMode readMode;
|
||||||
uint64_t rowsNum;
|
uint64_t rowsNum;
|
||||||
STimeWindow window; // the primary query time window that applies to all queries
|
STimeWindow window; // the primary query time window that applies to all queries
|
||||||
SSDataBlock* pResBlock;
|
SResultBlockInfo resBlockInfo;
|
||||||
int32_t capacity;
|
|
||||||
SReaderStatus status;
|
SReaderStatus status;
|
||||||
char* idStr; // query info handle, for debug purpose
|
char* idStr; // query info handle, for debug purpose
|
||||||
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
||||||
|
@ -205,7 +215,7 @@ struct STsdbReader {
|
||||||
SDelFReader* pDelFReader; // the del file reader
|
SDelFReader* pDelFReader; // the del file reader
|
||||||
SArray* pDelIdx; // del file block index;
|
SArray* pDelIdx; // del file block index;
|
||||||
SBlockInfoBuf blockInfoBuf;
|
SBlockInfoBuf blockInfoBuf;
|
||||||
int32_t step;
|
EContentData step;
|
||||||
STsdbReader* innerReader[2];
|
STsdbReader* innerReader[2];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -727,6 +737,21 @@ void tsdbReleaseDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, SQueryTableDataCond* pCond) {
|
||||||
|
pResBlockInfo->capacity = capacity;
|
||||||
|
pResBlockInfo->pResBlock = pResBlock;
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
|
if (pResBlockInfo->pResBlock == NULL) {
|
||||||
|
pResBlockInfo->freeBlock = true;
|
||||||
|
pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity);
|
||||||
|
} else {
|
||||||
|
pResBlockInfo->freeBlock = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
||||||
SSDataBlock* pResBlock, const char* idstr) {
|
SSDataBlock* pResBlock, const char* idstr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -746,22 +771,17 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
|
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
|
||||||
pReader->suid = pCond->suid;
|
pReader->suid = pCond->suid;
|
||||||
pReader->order = pCond->order;
|
pReader->order = pCond->order;
|
||||||
pReader->capacity = capacity;
|
|
||||||
pReader->pResBlock = pResBlock;
|
|
||||||
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
|
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
|
||||||
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
|
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
|
||||||
pReader->type = pCond->type;
|
pReader->type = pCond->type;
|
||||||
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||||
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
||||||
|
|
||||||
if (pReader->pResBlock == NULL) {
|
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
|
||||||
pReader->freeBlock = true;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
|
|
||||||
if (pReader->pResBlock == NULL) {
|
|
||||||
code = terrno;
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (pCond->numOfCols <= 0) {
|
if (pCond->numOfCols <= 0) {
|
||||||
tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
|
tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
|
||||||
|
@ -792,7 +812,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]);
|
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->resBlockInfo.pResBlock->pDataBlock, pSup->slotId[0]);
|
||||||
int32_t type = pReader->status.pPrimaryTsCol->info.type;
|
int32_t type = pReader->status.pPrimaryTsCol->info.type;
|
||||||
if (type != TSDB_DATA_TYPE_TIMESTAMP) {
|
if (type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
tsdbError("the first column isn't primary timestamp in result block, actual: %s, %s", tDataTypes[type].name,
|
tsdbError("the first column isn't primary timestamp in result block, actual: %s, %s", tDataTypes[type].name,
|
||||||
|
@ -1221,7 +1241,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
||||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
int32_t numOfOutputCols = pSupInfo->numOfCols;
|
int32_t numOfOutputCols = pSupInfo->numOfCols;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -1269,8 +1289,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
endIndex += step;
|
endIndex += step;
|
||||||
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
||||||
if (dumpedRows > pReader->capacity) { // output buffer check
|
if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check
|
||||||
dumpedRows = pReader->capacity;
|
dumpedRows = pReader->resBlockInfo.capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
@ -1785,7 +1805,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
|
pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
|
pInfo->moreThanCapcity = pBlock->nRow > pReader->resBlockInfo.capacity;
|
||||||
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
||||||
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
|
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
|
||||||
}
|
}
|
||||||
|
@ -1832,10 +1852,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
|
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
|
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
|
||||||
pBlock->info.id.uid = pBlockScanInfo->uid;
|
pBlock->info.id.uid = pBlockScanInfo->uid;
|
||||||
|
@ -1866,7 +1886,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
||||||
|
|
||||||
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||||
if (nextKey != key) { // merge is not needed
|
if (nextKey != key) { // merge is not needed
|
||||||
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1913,7 +1933,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
if (next1 != ts) {
|
if (next1 != ts) {
|
||||||
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1922,7 +1942,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2120,7 +2140,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
@ -2170,7 +2190,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
@ -2197,7 +2217,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
@ -2257,7 +2277,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
@ -2475,7 +2495,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
@ -2658,7 +2678,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
@ -2740,7 +2760,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
|
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
|
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
|
||||||
pResBlock->info.dataLoad = 1;
|
pResBlock->info.dataLoad = 1;
|
||||||
|
@ -2755,7 +2775,7 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlock
|
||||||
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||||
|
@ -2777,7 +2797,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
// it is a clean block, load it directly
|
// it is a clean block, load it directly
|
||||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
|
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
|
||||||
pBlock->nRow <= pReader->capacity) {
|
pBlock->nRow <= pReader->resBlockInfo.capacity) {
|
||||||
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
|
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
|
||||||
code = copyBlockDataToSDataBlock(pReader);
|
code = copyBlockDataToSDataBlock(pReader);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -2841,7 +2861,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResBlock->info.rows >= pReader->capacity) {
|
if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2974,9 +2994,15 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
||||||
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
// only check here, since the iterate data in memory is very fast.
|
||||||
|
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||||
|
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
|
int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
|
||||||
if (code) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(pIndexList);
|
taosArrayDestroy(pIndexList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3064,9 +3090,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
|
||||||
|
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// load the last data block of current table
|
// load the last data block of current table
|
||||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||||
|
|
||||||
|
@ -3098,7 +3129,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResBlock->info.rows >= pReader->capacity) {
|
if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3164,7 +3195,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
SBlockData* pBData = &pReader->status.fileBlockData;
|
SBlockData* pBData = &pReader->status.fileBlockData;
|
||||||
tBlockDataReset(pBData);
|
tBlockDataReset(pBData);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
|
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -3182,7 +3213,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResBlock->info.rows >= pReader->capacity) {
|
if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3197,7 +3228,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
pResBlock->info.rows, el, pReader->idStr);
|
pResBlock->info.rows, el, pReader->idStr);
|
||||||
}
|
}
|
||||||
} else { // whole block is required, return it directly
|
} else { // whole block is required, return it directly
|
||||||
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||||
pInfo->rows = pBlock->nRow;
|
pInfo->rows = pBlock->nRow;
|
||||||
pInfo->id.uid = pScanInfo->uid;
|
pInfo->id.uid = pScanInfo->uid;
|
||||||
pInfo->dataLoad = 0;
|
pInfo->dataLoad = 0;
|
||||||
|
@ -3373,7 +3404,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pResBlock->info.rows > 0) {
|
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3456,7 +3487,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pResBlock->info.rows > 0) {
|
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3481,7 +3512,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pResBlock->info.rows > 0) {
|
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3534,7 +3565,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pResBlock->info.rows > 0) {
|
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4173,7 +4204,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
||||||
|
|
||||||
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -4281,7 +4312,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
||||||
return metaGetIvtIdx(pMeta);
|
return metaGetIvtIdx(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
uint64_t tsdbGetReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
||||||
|
|
||||||
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
@ -4417,7 +4448,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->suspended = true;
|
pReader->flag = READER_STATUS_SUSPEND;
|
||||||
|
|
||||||
if (countOnly) {
|
if (countOnly) {
|
||||||
pReader->readMode = READ_MODE_COUNT_ONLY;
|
pReader->readMode = READ_MODE_COUNT_ONLY;
|
||||||
|
@ -4484,8 +4515,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->freeBlock) {
|
if (pReader->resBlockInfo.freeBlock) {
|
||||||
pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
|
pReader->resBlockInfo.pResBlock = blockDataDestroy(pReader->resBlockInfo.pResBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pSupInfo->colId);
|
taosMemoryFree(pSupInfo->colId);
|
||||||
|
@ -4622,7 +4653,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
|
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
|
||||||
if (pBlockScanInfo) {
|
if (pBlockScanInfo) {
|
||||||
// save lastKey to restore memory iterator
|
// save lastKey to restore memory iterator
|
||||||
STimeWindow w = pReader->pResBlock->info.window;
|
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
|
||||||
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;
|
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;
|
||||||
|
|
||||||
// reset current current table's data block scan info,
|
// reset current current table's data block scan info,
|
||||||
|
@ -4646,8 +4677,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
|
|
||||||
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
|
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
|
||||||
pReader->pReadSnap = NULL;
|
pReader->pReadSnap = NULL;
|
||||||
|
pReader->flag = READER_STATUS_SUSPEND;
|
||||||
pReader->suspended = true;
|
|
||||||
|
|
||||||
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
|
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
|
@ -4664,7 +4694,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
|
||||||
|
|
||||||
code = tsdbTryAcquireReader(pReader);
|
code = tsdbTryAcquireReader(pReader);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pReader->suspended) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4707,10 +4737,10 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
|
||||||
STsdbReader* pNextReader = pReader->innerReader[1];
|
STsdbReader* pNextReader = pReader->innerReader[1];
|
||||||
|
|
||||||
// we need only one row
|
// we need only one row
|
||||||
pPrevReader->capacity = 1;
|
pPrevReader->resBlockInfo.capacity = 1;
|
||||||
setSharedPtr(pPrevReader, pReader);
|
setSharedPtr(pPrevReader, pReader);
|
||||||
|
|
||||||
pNextReader->capacity = 1;
|
pNextReader->resBlockInfo.capacity = 1;
|
||||||
setSharedPtr(pNextReader, pReader);
|
setSharedPtr(pNextReader, pReader);
|
||||||
|
|
||||||
code = doOpenReaderImpl(pPrevReader);
|
code = doOpenReaderImpl(pPrevReader);
|
||||||
|
@ -4720,8 +4750,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->suspended = false;
|
pReader->flag = READER_STATUS_NORMAL;
|
||||||
|
|
||||||
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
|
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
|
||||||
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
|
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
|
@ -4733,7 +4762,7 @@ _err:
|
||||||
|
|
||||||
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
if (pReader->status.loadFromFile == false) {
|
if (pReader->status.loadFromFile == false) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -4762,7 +4791,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// cleanup the data that belongs to the previous data block
|
// cleanup the data that belongs to the previous data block
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
*hasNext = false;
|
*hasNext = false;
|
||||||
|
@ -4809,7 +4838,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
|
||||||
code = tsdbAcquireReader(pReader);
|
code = tsdbAcquireReader(pReader);
|
||||||
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
||||||
|
|
||||||
if (pReader->suspended) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4947,7 +4976,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
|
|
||||||
if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
|
if (pReader->resBlockInfo.pResBlock->info.id.uid != pFBlock->uid) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4973,8 +5002,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
|
|
||||||
pTsAgg->numOfNull = 0;
|
pTsAgg->numOfNull = 0;
|
||||||
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
pTsAgg->min = pReader->pResBlock->info.window.skey;
|
pTsAgg->min = pReader->resBlockInfo.pResBlock->info.window.skey;
|
||||||
pTsAgg->max = pReader->pResBlock->info.window.ekey;
|
pTsAgg->max = pReader->resBlockInfo.pResBlock->info.window.ekey;
|
||||||
|
|
||||||
// update the number of NULL data rows
|
// update the number of NULL data rows
|
||||||
size_t numOfCols = pSup->numOfCols;
|
size_t numOfCols = pSup->numOfCols;
|
||||||
|
@ -4985,7 +5014,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
||||||
taosArrayEnsureCap(pSup->pColAgg, colsNum);
|
taosArrayEnsureCap(pSup->pColAgg, colsNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
if (pResBlock->pBlockAgg == NULL) {
|
if (pResBlock->pBlockAgg == NULL) {
|
||||||
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
||||||
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
|
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
|
||||||
|
@ -5056,7 +5085,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pReader->pResBlock;
|
return pReader->resBlockInfo.pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
@ -5071,7 +5100,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pTReader->status;
|
SReaderStatus* pStatus = &pTReader->status;
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
return pTReader->pResBlock;
|
return pTReader->resBlockInfo.pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
|
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
|
||||||
|
@ -5086,7 +5115,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
|
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
|
|
||||||
if (pReader->suspended) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5167,7 +5196,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
|
|
||||||
// find the start data block in file
|
// find the start data block in file
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
if (pReader->suspended) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
}
|
}
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
@ -5240,7 +5269,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
if (pReader->suspended) {
|
if (pReader->flag == READER_STATUS_SUSPEND) {
|
||||||
tsdbReaderResume(pReader);
|
tsdbReaderResume(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5406,3 +5435,5 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
|
||||||
taosMemoryFreeClear(pReader->idStr);
|
taosMemoryFreeClear(pReader->idStr);
|
||||||
pReader->idStr = taosStrdup(idstr);
|
pReader->idStr = taosStrdup(idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->flag = READER_STATUS_SHOULD_STOP; }
|
||||||
|
|
|
@ -179,7 +179,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
if (NULL == pDeleter->nextOutput.pData) {
|
if (NULL == pDeleter->nextOutput.pData) {
|
||||||
assert(pDeleter->queryEnd);
|
ASSERT(pDeleter->queryEnd);
|
||||||
pOutput->useconds = pDeleter->useconds;
|
pOutput->useconds = pDeleter->useconds;
|
||||||
pOutput->precision = pDeleter->pSchema->precision;
|
pOutput->precision = pDeleter->pSchema->precision;
|
||||||
pOutput->bufStatus = DS_BUF_EMPTY;
|
pOutput->bufStatus = DS_BUF_EMPTY;
|
||||||
|
|
|
@ -181,7 +181,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
if (NULL == pDispatcher->nextOutput.pData) {
|
if (NULL == pDispatcher->nextOutput.pData) {
|
||||||
assert(pDispatcher->queryEnd);
|
ASSERT(pDispatcher->queryEnd);
|
||||||
pOutput->useconds = pDispatcher->useconds;
|
pOutput->useconds = pDispatcher->useconds;
|
||||||
pOutput->precision = pDispatcher->pSchema->precision;
|
pOutput->precision = pDispatcher->pSchema->precision;
|
||||||
pOutput->bufStatus = DS_BUF_EMPTY;
|
pOutput->bufStatus = DS_BUF_EMPTY;
|
||||||
|
|
|
@ -88,7 +88,6 @@ void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
|
||||||
|
|
||||||
// TODO refactor: use macro
|
// TODO refactor: use macro
|
||||||
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
|
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
|
||||||
assert(index >= 0 && offset != NULL);
|
|
||||||
return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
|
return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -926,7 +926,6 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
|
||||||
|
|
||||||
SResultRow* pResultRow =
|
SResultRow* pResultRow =
|
||||||
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false);
|
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false);
|
||||||
assert(pResultRow != NULL);
|
|
||||||
|
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -156,7 +156,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(w.skey < pBlockInfo->window.skey);
|
ASSERT(w.skey < pBlockInfo->window.skey);
|
||||||
if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
|
if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1855,7 +1855,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
|
uint64_t version = tsdbGetReaderMaxVersion(pTableScanInfo->base.dataReader);
|
||||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
|
@ -2021,7 +2021,7 @@ FETCH_NEXT_BLOCK:
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
|
uint64_t version = tsdbGetReaderMaxVersion(pTableScanInfo->base.dataReader);
|
||||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
|
@ -2426,7 +2426,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
|
|
||||||
if (pHandle->initTqReader) {
|
if (pHandle->initTqReader) {
|
||||||
ASSERT(pHandle->tqReader == NULL);
|
ASSERT(pHandle->tqReader == NULL);
|
||||||
pInfo->tqReader = tqOpenReader(pHandle->vnode);
|
pInfo->tqReader = tqReaderOpen(pHandle->vnode);
|
||||||
ASSERT(pInfo->tqReader);
|
ASSERT(pInfo->tqReader);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pHandle->tqReader);
|
ASSERT(pHandle->tqReader);
|
||||||
|
|
|
@ -408,7 +408,7 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int
|
||||||
|
|
||||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||||
|
|
||||||
assert(pFillInfo->numOfCurrent == resultCapacity);
|
ASSERT(pFillInfo->numOfCurrent == resultCapacity);
|
||||||
return resultCapacity;
|
return resultCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,7 +558,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
|
||||||
numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
|
numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
|
||||||
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
|
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
|
||||||
numOfRes += 1;
|
numOfRes += 1;
|
||||||
assert(numOfRes >= numOfRows);
|
ASSERT(numOfRes >= numOfRows);
|
||||||
} else { // reach the end of data
|
} else { // reach the end of data
|
||||||
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
|
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||||
(ekey1 >= pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
|
(ekey1 >= pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||||
|
@ -593,14 +593,14 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
|
||||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||||
|
|
||||||
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
|
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
|
||||||
assert(numOfRes <= capacity);
|
ASSERT(numOfRes <= capacity);
|
||||||
|
|
||||||
// no data existed for fill operation now, append result according to the fill strategy
|
// no data existed for fill operation now, append result according to the fill strategy
|
||||||
if (remain == 0) {
|
if (remain == 0) {
|
||||||
appendFilledResult(pFillInfo, p, numOfRes);
|
appendFilledResult(pFillInfo, p, numOfRes);
|
||||||
} else {
|
} else {
|
||||||
fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
|
fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
|
||||||
assert(numOfRes == pFillInfo->numOfCurrent);
|
ASSERT(numOfRes == pFillInfo->numOfCurrent);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
|
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
|
||||||
|
|
|
@ -153,7 +153,7 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(forwardRows >= 0);
|
ASSERT(forwardRows >= 0);
|
||||||
return forwardRows;
|
return forwardRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,8 +165,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
|
||||||
|
|
||||||
TSKEY* keyList = (TSKEY*)pValue;
|
TSKEY* keyList = (TSKEY*)pValue;
|
||||||
int32_t firstPos = 0;
|
int32_t firstPos = 0;
|
||||||
int32_t lastPos = num - 1;
|
int32_t lastPos = num - 1;
|
||||||
|
@ -230,7 +228,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
|
|
||||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||||
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
|
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
|
||||||
assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
|
ASSERT(startPos >= 0 && startPos < pDataBlockInfo->rows);
|
||||||
|
|
||||||
int32_t num = -1;
|
int32_t num = -1;
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||||
|
@ -261,7 +259,6 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(num >= 0);
|
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,7 +430,7 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nextRowIndex = endRowIndex + 1;
|
int32_t nextRowIndex = endRowIndex + 1;
|
||||||
assert(nextRowIndex >= 0);
|
ASSERT(nextRowIndex >= 0);
|
||||||
|
|
||||||
TSKEY nextKey = tsCols[nextRowIndex];
|
TSKEY nextKey = tsCols[nextRowIndex];
|
||||||
doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
|
doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
|
||||||
|
@ -494,9 +491,9 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
|
||||||
*/
|
*/
|
||||||
if (primaryKeys == NULL) {
|
if (primaryKeys == NULL) {
|
||||||
if (ascQuery) {
|
if (ascQuery) {
|
||||||
assert(pDataBlockInfo->window.skey <= pNext->ekey);
|
ASSERT(pDataBlockInfo->window.skey <= pNext->ekey);
|
||||||
} else {
|
} else {
|
||||||
assert(pDataBlockInfo->window.ekey >= pNext->skey);
|
ASSERT(pDataBlockInfo->window.ekey >= pNext->skey);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
|
if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
|
||||||
|
@ -533,7 +530,6 @@ static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType typ
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
|
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
|
||||||
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
|
|
||||||
if (type == RESULT_ROW_START_INTERP) {
|
if (type == RESULT_ROW_START_INTERP) {
|
||||||
pResult->startInterp = true;
|
pResult->startInterp = true;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -229,7 +229,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
||||||
taosArrayPush(pPageIdList, &pageId);
|
taosArrayPush(pPageIdList, &pageId);
|
||||||
|
|
||||||
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
|
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
|
||||||
assert(size <= getBufPageSize(pHandle->pBuf));
|
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||||
|
|
||||||
blockDataToBuf(pPage, p);
|
blockDataToBuf(pPage, p);
|
||||||
|
|
||||||
|
@ -592,7 +592,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
|
|
||||||
int32_t size =
|
int32_t size =
|
||||||
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
|
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
|
||||||
assert(size <= getBufPageSize(pHandle->pBuf));
|
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||||
|
|
||||||
blockDataToBuf(pPage, pDataBlock);
|
blockDataToBuf(pPage, pDataBlock);
|
||||||
|
|
||||||
|
|
|
@ -188,7 +188,6 @@ void dfaAdd(FstDfa *dfa, FstSparseSet *set, uint32_t ip) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
bool succ = sparSetAdd(set, ip, NULL);
|
bool succ = sparSetAdd(set, ip, NULL);
|
||||||
// assert(succ == true);
|
|
||||||
Inst *inst = taosArrayGet(dfa->insts, ip);
|
Inst *inst = taosArrayGet(dfa->insts, ip);
|
||||||
if (inst->ty == MATCH || inst->ty == RANGE) {
|
if (inst->ty == MATCH || inst->ty == RANGE) {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
|
|
@ -1662,73 +1662,6 @@ int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
|
return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
|
|
||||||
switch(type) {
|
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
case TSDB_DATA_TYPE_UTINYINT:{
|
|
||||||
int8_t* p = (int8_t*) dest;
|
|
||||||
int8_t* pSrc = (int8_t*) src;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
p[i] = pSrc[numOfRows - i - 1];
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
case TSDB_DATA_TYPE_USMALLINT:{
|
|
||||||
int16_t* p = (int16_t*) dest;
|
|
||||||
int16_t* pSrc = (int16_t*) src;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
p[i] = pSrc[numOfRows - i - 1];
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
case TSDB_DATA_TYPE_UINT: {
|
|
||||||
int32_t* p = (int32_t*) dest;
|
|
||||||
int32_t* pSrc = (int32_t*) src;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
p[i] = pSrc[numOfRows - i - 1];
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
case TSDB_DATA_TYPE_UBIGINT: {
|
|
||||||
int64_t* p = (int64_t*) dest;
|
|
||||||
int64_t* pSrc = (int64_t*) src;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
p[i] = pSrc[numOfRows - i - 1];
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case TSDB_DATA_TYPE_FLOAT: {
|
|
||||||
float* p = (float*) dest;
|
|
||||||
float* pSrc = (float*) src;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
p[i] = pSrc[numOfRows - i - 1];
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE: {
|
|
||||||
double* p = (double*) dest;
|
|
||||||
double* pSrc = (double*) src;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
p[i] = pSrc[numOfRows - i - 1];
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
default: assert(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
bool getTimePseudoFuncEnv(SFunctionNode *UNUSED_PARAM(pFunc), SFuncExecEnv *pEnv) {
|
bool getTimePseudoFuncEnv(SFunctionNode *UNUSED_PARAM(pFunc), SFuncExecEnv *pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(int64_t);
|
pEnv->calcMemSize = sizeof(int64_t);
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -69,8 +69,6 @@ SArray* taosArrayInit_s(size_t elemSize, size_t initialSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosArrayResize(SArray* pArray) {
|
static int32_t taosArrayResize(SArray* pArray) {
|
||||||
assert(pArray->size >= pArray->capacity);
|
|
||||||
|
|
||||||
size_t size = pArray->capacity;
|
size_t size = pArray->capacity;
|
||||||
size = (size << 1u);
|
size = (size << 1u);
|
||||||
|
|
||||||
|
@ -252,12 +250,12 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArraySet(SArray* pArray, size_t index, void* pData) {
|
void taosArraySet(SArray* pArray, size_t index, void* pData) {
|
||||||
assert(index < pArray->size);
|
ASSERT(index < pArray->size);
|
||||||
memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize);
|
memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
|
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
|
||||||
assert(cnt <= pArray->size);
|
ASSERT(cnt <= pArray->size);
|
||||||
pArray->size = pArray->size - cnt;
|
pArray->size = pArray->size - cnt;
|
||||||
if (pArray->size == 0 || cnt == 0) {
|
if (pArray->size == 0 || cnt == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -266,12 +264,15 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {
|
void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {
|
||||||
assert(cnt <= pArray->size);
|
if (cnt >= pArray->size) {
|
||||||
|
cnt = pArray->size;
|
||||||
|
}
|
||||||
|
|
||||||
pArray->size = pArray->size - cnt;
|
pArray->size = pArray->size - cnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayRemove(SArray* pArray, size_t index) {
|
void taosArrayRemove(SArray* pArray, size_t index) {
|
||||||
assert(index < pArray->size);
|
ASSERT(index < pArray->size);
|
||||||
|
|
||||||
if (index == pArray->size - 1) {
|
if (index == pArray->size - 1) {
|
||||||
taosArrayPop(pArray);
|
taosArrayPop(pArray);
|
||||||
|
@ -483,7 +484,8 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
|
||||||
// todo remove it
|
// todo remove it
|
||||||
// order array<type *>
|
// order array<type *>
|
||||||
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
|
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
|
||||||
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
taosqsort(pArray->pData, pArray->size, pArray->elemSize, param, fn);
|
||||||
|
// taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArraySwap(SArray* a, SArray* b) {
|
void taosArraySwap(SArray* a, SArray* b) {
|
||||||
|
|
|
@ -243,11 +243,6 @@ static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STr
|
||||||
if (next) {
|
if (next) {
|
||||||
next->prev = pElem->prev;
|
next->prev = pElem->prev;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCacheObj->numOfElemsInTrash == 0) {
|
|
||||||
assert(pCacheObj->pTrash == NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,8 +256,6 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem
|
||||||
}
|
}
|
||||||
|
|
||||||
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
|
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
|
||||||
assert(pNode != NULL && pEntry != NULL);
|
|
||||||
|
|
||||||
pNode->pNext = pEntry->next;
|
pNode->pNext = pEntry->next;
|
||||||
pEntry->next = pNode;
|
pEntry->next = pNode;
|
||||||
pEntry->num += 1;
|
pEntry->num += 1;
|
||||||
|
@ -503,7 +496,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
|
uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
|
||||||
|
|
||||||
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
||||||
assert(ref >= 2);
|
ASSERT(ref >= 2);
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -516,7 +509,6 @@ void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(T_REF_VAL_GET(ptNode) >= 1);
|
|
||||||
char *d = *data;
|
char *d = *data;
|
||||||
|
|
||||||
// clear its reference to old area
|
// clear its reference to old area
|
||||||
|
@ -575,19 +567,19 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
if (ref == 1) {
|
if (ref == 1) {
|
||||||
// If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
|
// If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
|
||||||
// destroyed by refresh worker if decrease ref count before removing it from linked-list.
|
// destroyed by refresh worker if decrease ref count before removing it from linked-list.
|
||||||
assert(pNode->pTNodeHeader->pData == pNode);
|
ASSERT(pNode->pTNodeHeader->pData == pNode);
|
||||||
|
|
||||||
__trashcan_wr_lock(pCacheObj);
|
__trashcan_wr_lock(pCacheObj);
|
||||||
doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
|
doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
|
||||||
__trashcan_unlock(pCacheObj);
|
__trashcan_unlock(pCacheObj);
|
||||||
|
|
||||||
ref = T_REF_DEC(pNode);
|
ref = T_REF_DEC(pNode);
|
||||||
assert(ref == 0);
|
ASSERT(ref == 0);
|
||||||
|
|
||||||
doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
|
doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
|
||||||
} else {
|
} else {
|
||||||
ref = T_REF_DEC(pNode);
|
ref = T_REF_DEC(pNode);
|
||||||
assert(ref >= 0);
|
ASSERT(ref >= 0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
|
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
|
||||||
|
@ -609,13 +601,13 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
"others already, prev must in trashcan",
|
"others already, prev must in trashcan",
|
||||||
pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
|
pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
|
||||||
|
|
||||||
assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
|
ASSERT(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
|
||||||
} else {
|
} else {
|
||||||
removeNodeInEntryList(pe, prev, p);
|
removeNodeInEntryList(pe, prev, p);
|
||||||
uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
|
uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
|
||||||
pNode->data, ref);
|
pNode->data, ref);
|
||||||
if (ref > 0) {
|
if (ref > 0) {
|
||||||
assert(pNode->pTNodeHeader == NULL);
|
ASSERT(pNode->pTNodeHeader == NULL);
|
||||||
taosAddToTrashcan(pCacheObj, pNode);
|
taosAddToTrashcan(pCacheObj, pNode);
|
||||||
} else { // ref == 0
|
} else { // ref == 0
|
||||||
atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
|
atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
|
||||||
|
@ -736,7 +728,7 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat
|
||||||
|
|
||||||
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
|
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
|
||||||
if (pNode->inTrashcan) { /* node is already in trash */
|
if (pNode->inTrashcan) { /* node is already in trash */
|
||||||
assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
|
ASSERT(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -782,7 +774,7 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
STrashElem *pElem = pCacheObj->pTrash;
|
STrashElem *pElem = pCacheObj->pTrash;
|
||||||
while (pElem) {
|
while (pElem) {
|
||||||
T_REF_VAL_CHECK(pElem->pData);
|
T_REF_VAL_CHECK(pElem->pData);
|
||||||
assert(pElem->next != pElem && pElem->prev != pElem);
|
ASSERT(pElem->next != pElem && pElem->prev != pElem);
|
||||||
|
|
||||||
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
||||||
uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
|
uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
|
||||||
|
@ -814,8 +806,6 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
|
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
|
||||||
assert(pCacheObj != NULL);
|
|
||||||
|
|
||||||
SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
||||||
doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
|
doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
|
||||||
}
|
}
|
||||||
|
@ -827,9 +817,7 @@ void taosCacheRefreshWorkerUnexpectedStopped(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosCacheTimedRefresh(void *handle) {
|
void *taosCacheTimedRefresh(void *handle) {
|
||||||
assert(pCacheArrayList != NULL);
|
|
||||||
uDebug("cache refresh thread starts");
|
uDebug("cache refresh thread starts");
|
||||||
|
|
||||||
setThreadName("cacheRefresh");
|
setThreadName("cacheRefresh");
|
||||||
|
|
||||||
const int32_t SLEEP_DURATION = 500; // 500 ms
|
const int32_t SLEEP_DURATION = 500; // 500 ms
|
||||||
|
|
|
@ -150,7 +150,6 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
|
||||||
//atomic_add_fetch_64(&pHashObj->compTimes, 1);
|
//atomic_add_fetch_64(&pHashObj->compTimes, 1);
|
||||||
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||||
pNode->removed == 0) {
|
pNode->removed == 0) {
|
||||||
assert(pNode->hashVal == hashVal);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,8 +188,6 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SHashNode *prev, SHashNode *pNode,
|
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SHashNode *prev, SHashNode *pNode,
|
||||||
SHashNode *pNewNode) {
|
SHashNode *pNewNode) {
|
||||||
assert(pNode->keyLen == pNewNode->keyLen);
|
|
||||||
|
|
||||||
atomic_sub_fetch_16(&pNode->refCount, 1);
|
atomic_sub_fetch_16(&pNode->refCount, 1);
|
||||||
if (prev != NULL) {
|
if (prev != NULL) {
|
||||||
prev->next = pNewNode;
|
prev->next = pNewNode;
|
||||||
|
@ -236,7 +233,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { return t
|
||||||
|
|
||||||
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
|
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
|
||||||
if (fn == NULL) {
|
if (fn == NULL) {
|
||||||
assert(0);
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,19 +339,11 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
|
||||||
taosHashEntryWLock(pHashObj, pe);
|
taosHashEntryWLock(pHashObj, pe);
|
||||||
|
|
||||||
SHashNode *pNode = pe->next;
|
SHashNode *pNode = pe->next;
|
||||||
#if 0
|
|
||||||
if (pe->num > 0) {
|
|
||||||
assert(pNode != NULL);
|
|
||||||
} else {
|
|
||||||
assert(pNode == NULL);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SHashNode *prev = NULL;
|
SHashNode *prev = NULL;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) && (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
|
if ((pNode->keyLen == keyLen) && (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
|
||||||
pNode->removed == 0) {
|
pNode->removed == 0) {
|
||||||
assert(pNode->hashVal == hashVal);
|
ASSERT(pNode->hashVal == hashVal);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,8 +359,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
|
||||||
}
|
}
|
||||||
|
|
||||||
pushfrontNodeInEntryList(pe, pNewNode);
|
pushfrontNodeInEntryList(pe, pNewNode);
|
||||||
assert(pe->next != NULL);
|
|
||||||
|
|
||||||
taosHashEntryWUnlock(pHashObj, pe);
|
taosHashEntryWUnlock(pHashObj, pe);
|
||||||
|
|
||||||
// enable resize
|
// enable resize
|
||||||
|
@ -446,14 +433,6 @@ void *taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *
|
||||||
char *data = NULL;
|
char *data = NULL;
|
||||||
taosHashEntryRLock(pHashObj, pe);
|
taosHashEntryRLock(pHashObj, pe);
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (pe->num > 0) {
|
|
||||||
assert(pe->next != NULL);
|
|
||||||
} else {
|
|
||||||
assert(pe->next == NULL);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal);
|
SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal);
|
||||||
if (pNode != NULL) {
|
if (pNode != NULL) {
|
||||||
if (pHashObj->callbackFp != NULL) {
|
if (pHashObj->callbackFp != NULL) {
|
||||||
|
@ -514,8 +493,6 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
|
|
||||||
// double check after locked
|
// double check after locked
|
||||||
if (pe->num == 0) {
|
if (pe->num == 0) {
|
||||||
assert(pe->next == NULL);
|
|
||||||
|
|
||||||
taosHashEntryWUnlock(pHashObj, pe);
|
taosHashEntryWUnlock(pHashObj, pe);
|
||||||
taosHashRUnlock(pHashObj);
|
taosHashRUnlock(pHashObj);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -568,13 +545,10 @@ void taosHashClear(SHashObj *pHashObj) {
|
||||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||||
if (pEntry->num == 0) {
|
if (pEntry->num == 0) {
|
||||||
assert(pEntry->next == NULL);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode = pEntry->next;
|
pNode = pEntry->next;
|
||||||
assert(pNode != NULL);
|
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
pNext = pNode->next;
|
pNext = pNode->next;
|
||||||
FREE_HASH_NODE(pHashObj->freeFp, pNode);
|
FREE_HASH_NODE(pHashObj->freeFp, pNode);
|
||||||
|
@ -671,14 +645,11 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
SHashNode *pPrev = NULL;
|
SHashNode *pPrev = NULL;
|
||||||
|
|
||||||
if (pe->num == 0) {
|
if (pe->num == 0) {
|
||||||
assert(pe->next == NULL);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode = pe->next;
|
pNode = pe->next;
|
||||||
|
|
||||||
assert(pNode != NULL);
|
|
||||||
|
|
||||||
while (pNode != NULL) {
|
while (pNode != NULL) {
|
||||||
int32_t newIdx = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
int32_t newIdx = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
pNext = pNode->next;
|
pNext = pNode->next;
|
||||||
|
@ -728,8 +699,6 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
|
||||||
}
|
}
|
||||||
|
|
||||||
void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) {
|
void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) {
|
||||||
assert(pNode != NULL && pEntry != NULL);
|
|
||||||
|
|
||||||
pNode->next = pEntry->next;
|
pNode->next = pEntry->next;
|
||||||
pEntry->next = pNode;
|
pEntry->next = pNode;
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,6 @@ SListNode *tdListPopNode(SList *list, SListNode *node) {
|
||||||
|
|
||||||
// Move all node elements from src to dst, the dst is assumed as an empty list
|
// Move all node elements from src to dst, the dst is assumed as an empty list
|
||||||
void tdListMove(SList *src, SList *dst) {
|
void tdListMove(SList *src, SList *dst) {
|
||||||
// assert(dst->eleSize == src->eleSize);
|
|
||||||
SListNode *node = NULL;
|
SListNode *node = NULL;
|
||||||
while ((node = tdListPopHead(src)) != NULL) {
|
while ((node = tdListPopHead(src)) != NULL) {
|
||||||
tdListAppendNode(dst, node);
|
tdListAppendNode(dst, node);
|
||||||
|
|
|
@ -85,13 +85,13 @@ struct SLRUEntry {
|
||||||
#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs)
|
#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs)
|
||||||
|
|
||||||
static bool taosLRUEntryUnref(SLRUEntry *entry) {
|
static bool taosLRUEntryUnref(SLRUEntry *entry) {
|
||||||
assert(entry->refs > 0);
|
ASSERT(entry->refs > 0);
|
||||||
--entry->refs;
|
--entry->refs;
|
||||||
return entry->refs == 0;
|
return entry->refs == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosLRUEntryFree(SLRUEntry *entry) {
|
static void taosLRUEntryFree(SLRUEntry *entry) {
|
||||||
assert(entry->refs == 0);
|
ASSERT(entry->refs == 0);
|
||||||
|
|
||||||
if (entry->deleter) {
|
if (entry->deleter) {
|
||||||
(*entry->deleter)(entry->keyData, entry->keyLength, entry->value);
|
(*entry->deleter)(entry->keyData, entry->keyLength, entry->value);
|
||||||
|
@ -127,7 +127,7 @@ static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t
|
||||||
SLRUEntry *h = table->list[i];
|
SLRUEntry *h = table->list[i];
|
||||||
while (h) {
|
while (h) {
|
||||||
SLRUEntry *n = h->nextHash;
|
SLRUEntry *n = h->nextHash;
|
||||||
assert(TAOS_LRU_ENTRY_IN_CACHE(h));
|
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(h));
|
||||||
func(h);
|
func(h);
|
||||||
h = n;
|
h = n;
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ static void taosLRUEntryTableResize(SLRUEntryTable *table) {
|
||||||
++count;
|
++count;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert(table->elems == count);
|
ASSERT(table->elems == count);
|
||||||
|
|
||||||
taosMemoryFree(table->list);
|
taosMemoryFree(table->list);
|
||||||
table->list = newList;
|
table->list = newList;
|
||||||
|
@ -240,17 +240,16 @@ struct SLRUCacheShard {
|
||||||
static void taosLRUCacheShardMaintainPoolSize(SLRUCacheShard *shard) {
|
static void taosLRUCacheShardMaintainPoolSize(SLRUCacheShard *shard) {
|
||||||
while (shard->highPriPoolUsage > shard->highPriPoolCapacity) {
|
while (shard->highPriPoolUsage > shard->highPriPoolCapacity) {
|
||||||
shard->lruLowPri = shard->lruLowPri->next;
|
shard->lruLowPri = shard->lruLowPri->next;
|
||||||
assert(shard->lruLowPri != &shard->lru);
|
ASSERT(shard->lruLowPri != &shard->lru);
|
||||||
TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(shard->lruLowPri, false);
|
TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(shard->lruLowPri, false);
|
||||||
|
|
||||||
assert(shard->highPriPoolUsage >= shard->lruLowPri->totalCharge);
|
ASSERT(shard->highPriPoolUsage >= shard->lruLowPri->totalCharge);
|
||||||
shard->highPriPoolUsage -= shard->lruLowPri->totalCharge;
|
shard->highPriPoolUsage -= shard->lruLowPri->totalCharge;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
|
static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
|
||||||
assert(e->next == NULL);
|
ASSERT(e->next == NULL && e->prev == NULL);
|
||||||
assert(e->prev == NULL);
|
|
||||||
|
|
||||||
if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
|
if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
|
||||||
e->next = &shard->lru;
|
e->next = &shard->lru;
|
||||||
|
@ -277,8 +276,7 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) {
|
static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) {
|
||||||
assert(e->next);
|
ASSERT(e->next && e->prev);
|
||||||
assert(e->prev);
|
|
||||||
|
|
||||||
if (shard->lruLowPri == e) {
|
if (shard->lruLowPri == e) {
|
||||||
shard->lruLowPri = e->prev;
|
shard->lruLowPri = e->prev;
|
||||||
|
@ -287,10 +285,10 @@ static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) {
|
||||||
e->prev->next = e->next;
|
e->prev->next = e->next;
|
||||||
e->prev = e->next = NULL;
|
e->prev = e->next = NULL;
|
||||||
|
|
||||||
assert(shard->lruUsage >= e->totalCharge);
|
ASSERT(shard->lruUsage >= e->totalCharge);
|
||||||
shard->lruUsage -= e->totalCharge;
|
shard->lruUsage -= e->totalCharge;
|
||||||
if (TAOS_LRU_ENTRY_IN_HIGH_POOL(e)) {
|
if (TAOS_LRU_ENTRY_IN_HIGH_POOL(e)) {
|
||||||
assert(shard->highPriPoolUsage >= e->totalCharge);
|
ASSERT(shard->highPriPoolUsage >= e->totalCharge);
|
||||||
shard->highPriPoolUsage -= e->totalCharge;
|
shard->highPriPoolUsage -= e->totalCharge;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -298,13 +296,13 @@ static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) {
|
||||||
static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArray *deleted) {
|
static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArray *deleted) {
|
||||||
while (shard->usage + charge > shard->capacity && shard->lru.next != &shard->lru) {
|
while (shard->usage + charge > shard->capacity && shard->lru.next != &shard->lru) {
|
||||||
SLRUEntry *old = shard->lru.next;
|
SLRUEntry *old = shard->lru.next;
|
||||||
assert(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
|
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
|
||||||
|
|
||||||
taosLRUCacheShardLRURemove(shard, old);
|
taosLRUCacheShardLRURemove(shard, old);
|
||||||
taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||||
|
|
||||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||||
assert(shard->usage >= old->totalCharge);
|
ASSERT(shard->usage >= old->totalCharge);
|
||||||
shard->usage -= old->totalCharge;
|
shard->usage -= old->totalCharge;
|
||||||
|
|
||||||
taosArrayPush(deleted, &old);
|
taosArrayPush(deleted, &old);
|
||||||
|
@ -391,11 +389,11 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
|
||||||
if (old != NULL) {
|
if (old != NULL) {
|
||||||
status = TAOS_LRU_STATUS_OK_OVERWRITTEN;
|
status = TAOS_LRU_STATUS_OK_OVERWRITTEN;
|
||||||
|
|
||||||
assert(TAOS_LRU_ENTRY_IN_CACHE(old));
|
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old));
|
||||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||||
if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
|
if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
|
||||||
taosLRUCacheShardLRURemove(shard, old);
|
taosLRUCacheShardLRURemove(shard, old);
|
||||||
assert(shard->usage >= old->totalCharge);
|
ASSERT(shard->usage >= old->totalCharge);
|
||||||
shard->usage -= old->totalCharge;
|
shard->usage -= old->totalCharge;
|
||||||
|
|
||||||
taosArrayPush(lastReferenceList, &old);
|
taosArrayPush(lastReferenceList, &old);
|
||||||
|
@ -455,7 +453,7 @@ static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key
|
||||||
taosThreadMutexLock(&shard->mutex);
|
taosThreadMutexLock(&shard->mutex);
|
||||||
e = taosLRUEntryTableLookup(&shard->table, key, keyLen, hash);
|
e = taosLRUEntryTableLookup(&shard->table, key, keyLen, hash);
|
||||||
if (e != NULL) {
|
if (e != NULL) {
|
||||||
assert(TAOS_LRU_ENTRY_IN_CACHE(e));
|
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(e));
|
||||||
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
||||||
taosLRUCacheShardLRURemove(shard, e);
|
taosLRUCacheShardLRURemove(shard, e);
|
||||||
}
|
}
|
||||||
|
@ -474,12 +472,12 @@ static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_
|
||||||
|
|
||||||
SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, key, keyLen, hash);
|
SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, key, keyLen, hash);
|
||||||
if (e != NULL) {
|
if (e != NULL) {
|
||||||
assert(TAOS_LRU_ENTRY_IN_CACHE(e));
|
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(e));
|
||||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||||
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
||||||
taosLRUCacheShardLRURemove(shard, e);
|
taosLRUCacheShardLRURemove(shard, e);
|
||||||
|
|
||||||
assert(shard->usage >= e->totalCharge);
|
ASSERT(shard->usage >= e->totalCharge);
|
||||||
shard->usage -= e->totalCharge;
|
shard->usage -= e->totalCharge;
|
||||||
lastReference = true;
|
lastReference = true;
|
||||||
}
|
}
|
||||||
|
@ -499,11 +497,11 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
|
||||||
|
|
||||||
while (shard->lru.next != &shard->lru) {
|
while (shard->lru.next != &shard->lru) {
|
||||||
SLRUEntry *old = shard->lru.next;
|
SLRUEntry *old = shard->lru.next;
|
||||||
assert(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
|
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
|
||||||
taosLRUCacheShardLRURemove(shard, old);
|
taosLRUCacheShardLRURemove(shard, old);
|
||||||
taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||||
assert(shard->usage >= old->totalCharge);
|
ASSERT(shard->usage >= old->totalCharge);
|
||||||
shard->usage -= old->totalCharge;
|
shard->usage -= old->totalCharge;
|
||||||
|
|
||||||
taosArrayPush(lastReferenceList, &old);
|
taosArrayPush(lastReferenceList, &old);
|
||||||
|
@ -524,7 +522,7 @@ static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) {
|
||||||
SLRUEntry *e = (SLRUEntry *)handle;
|
SLRUEntry *e = (SLRUEntry *)handle;
|
||||||
taosThreadMutexLock(&shard->mutex);
|
taosThreadMutexLock(&shard->mutex);
|
||||||
|
|
||||||
assert(TAOS_LRU_ENTRY_HAS_REFS(e));
|
ASSERT(TAOS_LRU_ENTRY_HAS_REFS(e));
|
||||||
TAOS_LRU_ENTRY_REF(e);
|
TAOS_LRU_ENTRY_REF(e);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&shard->mutex);
|
taosThreadMutexUnlock(&shard->mutex);
|
||||||
|
@ -545,7 +543,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
|
||||||
lastReference = taosLRUEntryUnref(e);
|
lastReference = taosLRUEntryUnref(e);
|
||||||
if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) {
|
if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) {
|
||||||
if (shard->usage > shard->capacity || eraseIfLastRef) {
|
if (shard->usage > shard->capacity || eraseIfLastRef) {
|
||||||
assert(shard->lru.next == &shard->lru || eraseIfLastRef);
|
ASSERT(shard->lru.next == &shard->lru || eraseIfLastRef);
|
||||||
|
|
||||||
taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
|
taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
|
||||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||||
|
@ -557,7 +555,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastReference && e->value) {
|
if (lastReference && e->value) {
|
||||||
assert(shard->usage >= e->totalCharge);
|
ASSERT(shard->usage >= e->totalCharge);
|
||||||
shard->usage -= e->totalCharge;
|
shard->usage -= e->totalCharge;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -595,7 +593,7 @@ static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
|
||||||
|
|
||||||
taosThreadMutexLock(&shard->mutex);
|
taosThreadMutexLock(&shard->mutex);
|
||||||
|
|
||||||
assert(shard->usage >= shard->lruUsage);
|
ASSERT(shard->usage >= shard->lruUsage);
|
||||||
usage = shard->usage - shard->lruUsage;
|
usage = shard->usage - shard->lruUsage;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&shard->mutex);
|
taosThreadMutexUnlock(&shard->mutex);
|
||||||
|
@ -687,7 +685,7 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
|
||||||
if (cache) {
|
if (cache) {
|
||||||
if (cache->shards) {
|
if (cache->shards) {
|
||||||
int numShards = cache->numShards;
|
int numShards = cache->numShards;
|
||||||
assert(numShards > 0);
|
ASSERT(numShards > 0);
|
||||||
for (int i = 0; i < numShards; ++i) {
|
for (int i = 0; i < numShards; ++i) {
|
||||||
taosLRUCacheShardCleanup(&cache->shards[i]);
|
taosLRUCacheShardCleanup(&cache->shards[i]);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue