From e9a3969b8e3a4ff1ce53cf0d6610aeff336f1d8e Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 1 Jun 2021 19:36:59 +0800 Subject: [PATCH 1/6] [TD-3963]async commit a msg to save config --- src/tsdb/inc/tsdbMemTable.h | 1 + src/tsdb/src/tsdbCommitQueue.c | 9 ++++++++- src/tsdb/src/tsdbMain.c | 2 +- src/tsdb/src/tsdbMemTable.c | 17 ++++++++++++++++- 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index 6046274af4..868a10d361 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -66,6 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); +int tsdbAsyncCommitConfig(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); void* tsdbCommitData(STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index abea79bc4f..4d4585421c 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -154,6 +154,7 @@ static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; STsdbRepo * pRepo = NULL; + bool config_changed = false; while (true) { pthread_mutex_lock(&(pQueue->lock)); @@ -177,11 +178,17 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; // check if need to apply new config + config_changed = pRepo->config_changed; if (pRepo->config_changed) { tsdbApplyRepoConfig(pRepo); } - tsdbCommitData(pRepo); + if (config_changed && pRepo->imem == NULL) { + tsem_post(&(pRepo->readyToCommit)); + } else { + tsdbCommitData(pRepo); + } + listNodeFree(pNode); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index afbedd5b2f..3a375ec59c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -271,7 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pthread_mutex_unlock(&repo->save_mutex); // schedule a commit msg then the new config will be applied immediatly - tsdbAsyncCommit(repo); + tsdbAsyncCommitConfig(repo); return 0; #if 0 diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 79dbb8be5d..4ca7b3b820 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -271,10 +271,25 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return ptr; } +int tsdbAsyncCommitConfig(STsdbRepo* pRepo) { + ASSERT(pRepo->config_changed == true); + tsem_wait(&(pRepo->readyToCommit)); + + if (pRepo->code != TSDB_CODE_SUCCESS) { + tsdbWarn("vgId:%d try to commit config when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno)); + } + + if (tsdbLockRepo(pRepo) < 0) return -1; + tsdbScheduleCommit(pRepo); + if (tsdbUnlockRepo(pRepo) < 0) return -1; + + return 0; +} + int tsdbAsyncCommit(STsdbRepo *pRepo) { tsem_wait(&(pRepo->readyToCommit)); - //ASSERT(pRepo->imem == NULL); + ASSERT(pRepo->imem == NULL); if (pRepo->mem == NULL) { tsem_post(&(pRepo->readyToCommit)); return 0; From 61f8139178c24ca2d8888b24e85aab890155cc57 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Thu, 3 Jun 2021 18:17:22 +0800 Subject: [PATCH 2/6] update test case --- tests/pytest/query/last_row_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest/query/last_row_cache.py b/tests/pytest/query/last_row_cache.py index a0e8147709..f116d0254e 100644 --- a/tests/pytest/query/last_row_cache.py +++ b/tests/pytest/query/last_row_cache.py @@ -25,7 +25,7 @@ class TDTestCase: self.tables = 10 self.rows = 20 - self.columns = 50 + self.columns = 5 self.perfix = 't' self.ts = 1601481600000 @@ -34,7 +34,7 @@ class TDTestCase: sql = "create table st(ts timestamp, " for i in range(self.columns - 1): sql += "c%d int, " % (i + 1) - sql += "c50 int) tags(t1 int)" + sql += "c5 int) tags(t1 int)" tdSql.execute(sql) for i in range(self.tables): From e81c7324dbd109d19b0eff14681b8058a4cf3eaf Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 4 Jun 2021 11:23:01 +0800 Subject: [PATCH 3/6] [TD-3963]fix cache_last_row bug --- src/tsdb/src/tsdbMain.c | 3 +-- src/tsdb/src/tsdbMemTable.c | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3a375ec59c..5a4c785f02 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -907,13 +907,12 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { atomic_store_8(&pRepo->hasCachedLastColumn, 0); } tsdbInfo("free cache last data since cacheLast option changed"); - for (int i = 1; i < maxTableIdx; i++) { + for (int i = 1; i <= maxTableIdx; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; if (need_free_last_row) { taosTZfree(pTable->lastRow); pTable->lastRow = NULL; - pTable->lastKey = TSKEY_INITIAL_VAL; } if (need_free_last_col) { tsdbFreeLastColumns(pTable); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 1f88474f8e..ec773dbb38 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -1030,7 +1030,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow taosTZfree(pTable->lastRow); TSDB_WLOCK_TABLE(pTable); pTable->lastRow = NULL; - pTable->lastKey = TSKEY_INITIAL_VAL; TSDB_WUNLOCK_TABLE(pTable); } From d7219d1aaaa48d4c86337671d4505fdf59726832 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 4 Jun 2021 14:50:47 +0800 Subject: [PATCH 4/6] [TD-3963]sync config change msg and wait for the config applied --- src/tsdb/inc/tsdbMemTable.h | 2 +- src/tsdb/src/tsdbMain.c | 4 ++-- src/tsdb/src/tsdbMemTable.c | 11 ++++++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index 868a10d361..babb7024b2 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -66,7 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); -int tsdbAsyncCommitConfig(STsdbRepo* pRepo); +int tsdbSyncCommitConfig(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); void* tsdbCommitData(STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 5a4c785f02..ce20bf1821 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -270,8 +270,8 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pthread_mutex_unlock(&repo->save_mutex); - // schedule a commit msg then the new config will be applied immediatly - tsdbAsyncCommitConfig(repo); + // schedule a commit msg and wait for the new config applied + tsdbSyncCommitConfig(repo); return 0; #if 0 diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ec773dbb38..9d8b1ca7f2 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -271,7 +271,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return ptr; } -int tsdbAsyncCommitConfig(STsdbRepo* pRepo) { +int tsdbSyncCommitConfig(STsdbRepo* pRepo) { ASSERT(pRepo->config_changed == true); tsem_wait(&(pRepo->readyToCommit)); @@ -283,6 +283,15 @@ int tsdbAsyncCommitConfig(STsdbRepo* pRepo) { tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ); if (tsdbUnlockRepo(pRepo) < 0) return -1; + tsem_wait(&(pRepo->readyToCommit)); + tsem_post(&(pRepo->readyToCommit)); + + if (pRepo->code != TSDB_CODE_SUCCESS) { + terrno = pRepo->code; + return -1; + } + + terrno = TSDB_CODE_SUCCESS; return 0; } From 2241f2f2cb941d8733381b0fc1f16bbb481fbf95 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 5 Jun 2021 10:04:07 +0800 Subject: [PATCH 5/6] [TD-3963]fix tsdbGetCachedLastRow:when lastRow == NULL return TSDB_CODE_TDB_NO_CACHE_LAST_ROW --- src/inc/taoserror.h | 1 + src/tsdb/inc/tsdbint.h | 1 - src/tsdb/src/tsdbMain.c | 11 +---------- src/tsdb/src/tsdbRead.c | 28 +++++++++++++++------------- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index ab15e851e7..431c9116cc 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -244,6 +244,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk") #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message") #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value") +#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data") // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle") diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 049c1bdb6e..7cf8882631 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -78,7 +78,6 @@ struct STsdbRepo { bool config_changed; // config changed flag pthread_mutex_t save_mutex; // protect save config - uint8_t hasCachedLastRow; uint8_t hasCachedLastColumn; STsdbAppH appH; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index ce20bf1821..bb02e01283 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -553,7 +553,6 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } pRepo->config_changed = false; - atomic_store_8(&pRepo->hasCachedLastRow, 0); atomic_store_8(&pRepo->hasCachedLastColumn, 0); code = tsem_init(&(pRepo->readyToCommit), 0, 1); @@ -857,9 +856,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } tsdbDestroyReadH(&readh); - if (CACHE_LAST_ROW(pCfg)) { - atomic_store_8(&pRepo->hasCachedLastRow, 1); - } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { atomic_store_8(&pRepo->hasCachedLastColumn, 1); } @@ -900,9 +897,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { // if close last option,need to free data if (need_free_last_row || need_free_last_col) { - if (need_free_last_row) { - atomic_store_8(&pRepo->hasCachedLastRow, 0); - } if (need_free_last_col) { atomic_store_8(&pRepo->hasCachedLastColumn, 0); } @@ -982,9 +976,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { tsdbDestroyReadH(&readh); - if (cacheLastRow) { - atomic_store_8(&pRepo->hasCachedLastRow, 1); - } if (cacheLastCol) { atomic_store_8(&pRepo->hasCachedLastColumn, 1); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1545d44395..bbfa723cb5 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2469,7 +2469,6 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { if (ret != TSDB_CODE_SUCCESS) { return false; } - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); tfree(pRow); @@ -2860,24 +2859,27 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) { } /* - * 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL - * 2. has data but not loaded, just return lastKey but not set pRes - * 3. has data and loaded, return lastKey and set pRes + * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW + * else set pRes and return TSDB_CODE_SUCCESS */ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { - TSDB_RLOCK_TABLE(pTable); - *lastKey = pTable->lastKey; + int32_t code = TSDB_CODE_SUCCESS; - if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) { - *pRes = tdDataRowDup(pTable->lastRow); - if (*pRes == NULL) { - TSDB_RUNLOCK_TABLE(pTable); - return TSDB_CODE_TDB_OUT_OF_MEMORY; - } + TSDB_RLOCK_TABLE(pTable); + + if (!pTable->lastRow) { + code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW; + goto out; } + *pRes = tdDataRowDup(pTable->lastRow); + if (*pRes == NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + } + +out: TSDB_RUNLOCK_TABLE(pTable); - return TSDB_CODE_SUCCESS; + return code; } bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) { From 9d353971b7816488eb78dad2cf8dab7d10cf5510 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 5 Jun 2021 12:14:57 +0800 Subject: [PATCH 6/6] [TD-3963]no need to save last row in checkForCachedLastRow --- src/tsdb/src/tsdbRead.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index bbfa723cb5..c333294179 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2860,7 +2860,7 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) { /* * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW - * else set pRes and return TSDB_CODE_SUCCESS + * else set pRes and return TSDB_CODE_SUCCESS and save lastKey */ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { int32_t code = TSDB_CODE_SUCCESS; @@ -2872,9 +2872,11 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { goto out; } - *pRes = tdDataRowDup(pTable->lastRow); - if (*pRes == NULL) { - code = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (pRes) { + *pRes = tdDataRowDup(pTable->lastRow); + if (*pRes == NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + } } out: @@ -2889,7 +2891,6 @@ bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) { int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) { assert(pQueryHandle != NULL && groupList != NULL); - SDataRow pRow = NULL; TSKEY key = TSKEY_INITIAL_VAL; SArray* group = taosArrayGetP(groupList->pGroupList, 0); @@ -2900,7 +2901,7 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g int32_t code = 0; if (((STable*)pInfo->pTable)->lastRow) { - code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); + code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key); if (code != TSDB_CODE_SUCCESS) { pQueryHandle->cachelastrow = 0; } else { @@ -2915,7 +2916,6 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g pQueryHandle->activeIndex = -1; // start from -1 } - tfree(pRow); return code; }