Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last
This commit is contained in:
commit
c2db42f115
|
@ -146,7 +146,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI
|
|||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||
STsdbReader* pReader);
|
||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
||||
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
||||
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
|
||||
int32_t rowIndex);
|
||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
|
||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
||||
|
@ -2134,15 +2135,18 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
|
|||
|
||||
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||
STSchema* pTSchema = NULL;
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
|
||||
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||
if (pReader->pSchema == NULL) {
|
||||
pReader->pSchema = pTSchema;
|
||||
}
|
||||
} else {
|
||||
pTSchema = pReader->pSchema;
|
||||
}
|
||||
|
||||
tRowMergerAdd(pMerger, pRow, pTSchema);
|
||||
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pTSchema != pReader->pSchema) {
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
}
|
||||
|
@ -2271,8 +2275,11 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
|||
// updateSchema(pRow, uid, pReader);
|
||||
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||
STSchema* pTSchema = NULL;
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
|
||||
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||
if (pReader->pSchema == NULL) {
|
||||
pReader->pSchema = pTSchema;
|
||||
}
|
||||
} else {
|
||||
pTSchema = pReader->pSchema;
|
||||
}
|
||||
|
@ -2282,7 +2289,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
|||
tRowMergerGetRow(&merge, pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pTSchema != pReader->pSchema) {
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -416,6 +416,7 @@ typedef struct SCtgCacheOperation {
|
|||
bool syncOp;
|
||||
tsem_t rspSem;
|
||||
bool stopQueue;
|
||||
bool unLocked;
|
||||
} SCtgCacheOperation;
|
||||
|
||||
typedef struct SCtgQNode {
|
||||
|
|
|
@ -674,7 +674,13 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
|||
tsem_post(&gCtgMgmt.queue.reqSem);
|
||||
|
||||
if (syncOp) {
|
||||
if (!operation->unLocked) {
|
||||
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
}
|
||||
tsem_wait(&operation->rspSem);
|
||||
if (!operation->unLocked) {
|
||||
CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
}
|
||||
taosMemoryFree(operation);
|
||||
}
|
||||
|
||||
|
@ -1011,6 +1017,7 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool
|
|||
op->opId = CTG_OP_CLEAR_CACHE;
|
||||
op->syncOp = syncOp;
|
||||
op->stopQueue = stopQueue;
|
||||
op->unLocked = true;
|
||||
|
||||
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
|
||||
if (NULL == msg) {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "catalogInt.h"
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
SCtgDebug gCTGDebug = {0};
|
||||
SCtgDebug gCTGDebug = {.lockEnable = true};
|
||||
|
||||
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
|
||||
ASSERT(*(int32_t*)param == 1);
|
||||
|
|
|
@ -331,7 +331,7 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
|||
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pTask->pBatchs = pBatchs;
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue