commit
777cde390f
|
@ -37,6 +37,7 @@ typedef struct {
|
|||
TSKEY keyLast;
|
||||
int64_t numOfRows;
|
||||
SSkipList* pData;
|
||||
T_REF_DECLARE()
|
||||
} STableData;
|
||||
|
||||
typedef struct {
|
||||
|
@ -76,7 +77,7 @@ typedef struct {
|
|||
|
||||
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, SArray* pATable);
|
||||
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
|
||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
|
|
|
@ -124,17 +124,66 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
|
||||
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, SArray *pATable) {
|
||||
SMemTable *tmem;
|
||||
|
||||
// Get snap object
|
||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||
|
||||
*pMem = pRepo->mem;
|
||||
tmem = pRepo->mem;
|
||||
*pIMem = pRepo->imem;
|
||||
tsdbRefMemTable(pRepo, *pMem);
|
||||
tsdbRefMemTable(pRepo, tmem);
|
||||
tsdbRefMemTable(pRepo, *pIMem);
|
||||
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));
|
||||
// Copy mem objects and ref needed STableData
|
||||
if (tmem) {
|
||||
taosRLockLatch(&(tmem->latch));
|
||||
|
||||
*pMem = (SMemTable *)calloc(1, sizeof(**pMem));
|
||||
if (*pMem == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
taosRUnLockLatch(&(tmem->latch));
|
||||
tsdbUnRefMemTable(pRepo, tmem);
|
||||
tsdbUnRefMemTable(pRepo, *pIMem);
|
||||
*pMem = NULL;
|
||||
*pIMem = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
(*pMem)->tData = (STableData **)calloc(tmem->maxTables, sizeof(STableData *));
|
||||
if ((*pMem)->tData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
taosRUnLockLatch(&(tmem->latch));
|
||||
free(*pMem);
|
||||
tsdbUnRefMemTable(pRepo, tmem);
|
||||
tsdbUnRefMemTable(pRepo, *pIMem);
|
||||
*pMem = NULL;
|
||||
*pIMem = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
(*pMem)->keyFirst = tmem->keyFirst;
|
||||
(*pMem)->keyLast = tmem->keyLast;
|
||||
(*pMem)->numOfRows = tmem->numOfRows;
|
||||
(*pMem)->maxTables = tmem->maxTables;
|
||||
|
||||
for (size_t i = 0; i < taosArrayGetSize(pATable); i++) {
|
||||
STable * pTable = *(STable **)taosArrayGet(pATable, i);
|
||||
int32_t tid = TABLE_TID(pTable);
|
||||
STableData *pTableData = (tid < tmem->maxTables) ? tmem->tData[tid] : NULL;
|
||||
|
||||
if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue;
|
||||
|
||||
(*pMem)->tData[tid] = tmem->tData[tid];
|
||||
T_REF_INC(tmem->tData[tid]);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&(tmem->latch));
|
||||
}
|
||||
|
||||
tsdbUnRefMemTable(pRepo, tmem);
|
||||
|
||||
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
|
||||
return 0;
|
||||
|
@ -144,8 +193,14 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
|
|||
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
|
||||
|
||||
if (pMem != NULL) {
|
||||
taosRUnLockLatch(&(pMem->latch));
|
||||
tsdbUnRefMemTable(pRepo, pMem);
|
||||
for (size_t i = 0; i < pMem->maxTables; i++) {
|
||||
STableData *pTableData = pMem->tData[i];
|
||||
if (pTableData) {
|
||||
tsdbFreeTableData(pTableData);
|
||||
}
|
||||
}
|
||||
free(pMem->tData);
|
||||
free(pMem);
|
||||
}
|
||||
|
||||
if (pIMem != NULL) {
|
||||
|
@ -436,7 +491,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
|
|||
STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
|
||||
if (pTableData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTableData->uid = TABLE_UID(pTable);
|
||||
|
@ -449,20 +504,22 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
|
|||
tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
|
||||
if (pTableData->pData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
free(pTableData);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pTableData;
|
||||
T_REF_INC(pTableData);
|
||||
|
||||
_err:
|
||||
tsdbFreeTableData(pTableData);
|
||||
return NULL;
|
||||
return pTableData;
|
||||
}
|
||||
|
||||
static void tsdbFreeTableData(STableData *pTableData) {
|
||||
if (pTableData) {
|
||||
tSkipListDestroy(pTableData->pData);
|
||||
free(pTableData);
|
||||
int32_t ref = T_REF_DEC(pTableData);
|
||||
if (ref == 0) {
|
||||
tSkipListDestroy(pTableData->pData);
|
||||
free(pTableData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -187,13 +187,15 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
|
|||
return pLocalIdList;
|
||||
}
|
||||
|
||||
static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
|
||||
static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle, SArray* psTable) {
|
||||
assert(pQueryHandle != NULL && pQueryHandle->pMemRef != NULL);
|
||||
|
||||
SMemRef* pMemRef = pQueryHandle->pMemRef;
|
||||
if (pQueryHandle->pMemRef->ref++ == 0) {
|
||||
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem));
|
||||
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem), psTable);
|
||||
}
|
||||
|
||||
taosArrayDestroy(psTable);
|
||||
}
|
||||
|
||||
static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
|
||||
|
@ -242,7 +244,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
|
|||
return rows;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta, SArray** psTable) {
|
||||
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
||||
assert(sizeOfGroup >= 1 && pMeta != NULL);
|
||||
|
||||
|
@ -252,6 +254,12 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SArray* pTable = taosArrayInit(4, sizeof(STable*));
|
||||
if (pTable == NULL) {
|
||||
taosArrayDestroy(pTableCheckInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
||||
SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);
|
||||
|
@ -277,31 +285,40 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
|
|||
assert(info.lastKey <= pQueryHandle->window.skey);
|
||||
}
|
||||
|
||||
taosArrayPush(pTable, &pKeyInfo->pTable);
|
||||
|
||||
taosArrayPush(pTableCheckInfo, &info);
|
||||
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
|
||||
info.tableId.tid, info.lastKey, pQueryHandle->qinfo);
|
||||
}
|
||||
}
|
||||
|
||||
*psTable = pTable;
|
||||
|
||||
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
return pTableCheckInfo;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) {
|
||||
static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey, SArray** psTable) {
|
||||
size_t si = taosArrayGetSize(pTableCheckInfo);
|
||||
SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo));
|
||||
if (pNew == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SArray* pTable = taosArrayInit(si, sizeof(STable*));
|
||||
|
||||
for (int32_t j = 0; j < si; ++j) {
|
||||
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, j);
|
||||
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
|
||||
|
||||
info.tableId = pCheckInfo->tableId;
|
||||
taosArrayPush(pNew, &info);
|
||||
taosArrayPush(pTable, &pCheckInfo->pTableObj);
|
||||
}
|
||||
|
||||
*psTable = pTable;
|
||||
|
||||
// it is ordered already, no need to sort again.
|
||||
taosArraySort(pNew, tsdbCheckInfoCompar);
|
||||
return pNew;
|
||||
|
@ -332,7 +349,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
|||
goto out_of_memory;
|
||||
}
|
||||
|
||||
tsdbMayTakeMemSnapshot(pQueryHandle);
|
||||
//tsdbMayTakeMemSnapshot(pQueryHandle);
|
||||
assert(pCond != NULL && pCond->numOfCols > 0 && pMemRef != NULL);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pCond->order)) {
|
||||
|
@ -393,14 +410,18 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable
|
|||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||
assert(pMeta != NULL);
|
||||
|
||||
SArray* psTable = NULL;
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta);
|
||||
pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta, &psTable);
|
||||
if (pQueryHandle->pTableCheckInfo == NULL) {
|
||||
tsdbCleanupQueryHandle(pQueryHandle);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsdbMayTakeMemSnapshot(pQueryHandle, psTable);
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
|
||||
return (TsdbQueryHandleT) pQueryHandle;
|
||||
}
|
||||
|
@ -2337,12 +2358,18 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM
|
|||
pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pMemRef);
|
||||
|
||||
tfree(cond.colList);
|
||||
pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey);
|
||||
|
||||
SArray* psTable = NULL;
|
||||
|
||||
pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey, &psTable);
|
||||
if (pSecQueryHandle->pTableCheckInfo == NULL) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto out_of_memory;
|
||||
}
|
||||
|
||||
|
||||
tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
|
||||
|
||||
if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
|
||||
// no result in current query, free the corresponding result rows structure
|
||||
if (type == TSDB_PREV_ROW) {
|
||||
|
|
Loading…
Reference in New Issue