diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index f27af3b722..70f8e054f8 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -123,8 +123,6 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); } - ASSERT(!POOL_IS_EMPTY(pBufPool)); - SListNode * pNode = tdListPopHead(pBufPool->bufBlockList); STsdbBufBlock *pBufBlock = NULL; tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock)); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 55fda4e116..cfb405f518 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -97,7 +97,6 @@ int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { // Need to lock the repository int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { - ASSERT(IS_REPO_LOCKED(pRepo)); ASSERT(pMemTable != NULL); if (T_REF_DEC(pMemTable) == 0) { @@ -105,12 +104,15 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; + // TODO: check the correctness of this code part + if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { tdListAppendNode(pBufPool->bufBlockList, pNode); if (pthread_cond_signal(&pBufPool->poolNotEmpty) != 0) { // TODO } } + if (tsdbUnlockRepo(pRepo) < 0) return -1; for (int i = 0; i < pCfg->maxTables; i++) { if (pMemTable->tData[i] != NULL) { @@ -127,7 +129,8 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { // ---------------- LOCAL FUNCTIONS ---------------- static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { - if (pRepo == NULL || pRepo->mem == NULL) return NULL; + ASSERT(pRepo != NULL); + if (pRepo->mem == NULL) return NULL; SListNode *pNode = listTail(pRepo->mem); if (pNode == NULL) return NULL; @@ -141,45 +144,40 @@ static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + int code = 0; if (pBufBlock != NULL && pBufBlock->remain < bytes) { - if (listNEles(pRepo->mem) >= pCfg->totalBlocks / 2) { // need to trigger commit - if (pRepo->imem != NULL) { - if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); - - ASSERT(pRepo->commit == 0); - - SMemTable *pIMem = pRepo->imem; - if (tsdbLockRepo(pRepo) < 0) { - // TODO + if (listNEles(pRepo->mem) >= pCfg->totalBlocks / 2) { // need to commit mem + if (pRepo->imem) { + code = pthread_join(pRepo->commitThread, NULL); + if (code != 0) { + tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - - pRepo->imem = pRepo->mem; - pRepo->mem = NULL; - pRepo->commit = 1; - if (pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo) != 0) { - // TODO - tsdbUnlockRepo(pRepo); - return NULL; - } - if (tsdbUnlockRepo(pRepo) < 0) { - // TODO - return NULL; - } - - tsdbUnRefMemTable(pRepo, pIMem); } - } else { - if (tsdbLockRepo(pRepo) < 0) { - tsdbFreeMemTable(pMemTable); + + ASSERT(pRepo->commit == 0); + SMemTable *pImem = pRepo->imem; + + if (tsdbLockRepo(pRepo) < 0) return NULL; + pRepo->imem = pRepo->mem; + pRepo->mem = NULL; + pRepo->commit = 1; + code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo); + if (code != 0) { + tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + tsdbUnlockRepo(pRepo); return NULL; } + if (tsdbUnlockRepo(pRepo) < 0) return NULL; + if (pImem && tsdbUnRefMemTable(pRepo, pImem) < 0) return NULL; + } else { + if (tsdbLockRepo(pRepo) < 0) return NULL; SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); - tdListAppendNode(pMemTable->bufBlockList, pNode); - pRepo->mem = pMemTable; - + tdListAppendNode(pRepo->mem->bufBlockList, pNode); if (tsdbUnlockRepo(pRepo) < 0) return NULL; } }