diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8574ff85c3..1726d8696b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -22,48 +22,13 @@ #define ROCKS_BATCH_SIZE (4096) -#if 0 -static int32_t tsdbOpenBICache(STsdb *pTsdb) { - int32_t code = 0; - SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); - if (pCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - taosLRUCacheSetStrictCapacity(pCache, false); - - taosThreadMutexInit(&pTsdb->biMutex, NULL); - -_err: - pTsdb->biCache = pCache; - return code; -} - -static void tsdbCloseBICache(STsdb *pTsdb) { - SLRUCache *pCache = pTsdb->biCache; - if (pCache) { - int32_t elems = taosLRUCacheGetElems(pCache); - tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); - taosLRUCacheEraseUnrefEntries(pCache); - elems = taosLRUCacheGetElems(pCache); - tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); - - taosLRUCacheCleanup(pCache); - - taosThreadMutexDestroy(&pTsdb->biMutex); - } -} -#endif - static int32_t tsdbOpenBCache(STsdb *pTsdb) { - int32_t code = 0; + int32_t code = 0, lino = 0; int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; int64_t szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize; SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5); if (pCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } taosLRUCacheSetStrictCapacity(pCache, false); @@ -73,7 +38,12 @@ static int32_t tsdbOpenBCache(STsdb *pTsdb) { pTsdb->bCache = pCache; _err: - return code; + if (code) { + tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + + TAOS_RETURN(code); } static void tsdbCloseBCache(STsdb *pTsdb) { @@ -92,23 +62,26 @@ static void tsdbCloseBCache(STsdb *pTsdb) { } static int32_t tsdbOpenPgCache(STsdb *pTsdb) { - int32_t code = 0; - // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); + int32_t code = 0, lino = 0; int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5); if (pCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } taosLRUCacheSetStrictCapacity(pCache, false); taosThreadMutexInit(&pTsdb->pgMutex, NULL); -_err: pTsdb->pgCache = pCache; - return code; + +_err: + if (code) { + tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); + } + + TAOS_RETURN(code); } static void tsdbClosePgCache(STsdb *pTsdb) { @@ -186,12 +159,11 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t } static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { - int32_t code = 0; + int32_t code = 0, lino = 0; rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName); if (NULL == cmp) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024); @@ -202,8 +174,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_options_t *options = rocksdb_options_create(); if (NULL == options) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } rocksdb_options_set_create_if_missing(options, 1); @@ -216,15 +187,13 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); if (NULL == writeoptions) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err2; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2); } rocksdb_writeoptions_disable_WAL(writeoptions, 1); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); if (NULL == readoptions) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err3; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3); } char *err = NULL; @@ -236,14 +205,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err4; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4); } rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); if (NULL == flushoptions) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err5; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5); } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); @@ -262,7 +229,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.pTSchema = NULL; - return code; + TAOS_RETURN(code); _err5: rocksdb_close(pTsdb->rCache.db); @@ -276,7 +243,8 @@ _err2: rocksdb_cache_destroy(cache); _err: rocksdb_comparator_destroy(cmp); - return code; + + TAOS_RETURN(code); } static void tsdbCloseRocksCache(STsdb *pTsdb) { @@ -371,24 +339,26 @@ static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) { } } -static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) { +static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) { if (!value) { - return NULL; + return TSDB_CODE_INVALID_PARA; } SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); if (NULL == pLastCol) { - return NULL; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } int32_t offset = tsdbCacheDeserializeV0(value, pLastCol); if (offset == size) { // version 0 - return pLastCol; + *ppLastCol = pLastCol; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } else if (offset > size) { - terrno = TSDB_CODE_INVALID_DATA_FMT; taosMemoryFreeClear(pLastCol); - return NULL; + + TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT); } // version @@ -414,12 +384,14 @@ static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) { } if (offset > size) { - terrno = TSDB_CODE_INVALID_DATA_FMT; taosMemoryFreeClear(pLastCol); - return NULL; + + TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT); } - return pLastCol; + *ppLastCol = pLastCol; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } /* @@ -457,7 +429,7 @@ static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) { return 0; } -static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { +static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { *size = sizeof(SLastColV0); if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { *size += pLastCol->colVal.value.nData; @@ -472,6 +444,9 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { } *value = taosMemoryMalloc(*size); + if (NULL == *value) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } int32_t offset = tsdbCacheSerializeV0(*value, pLastCol); @@ -494,16 +469,22 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { offset += pLastCol->rowKey.pks[i].nData; } } + + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) { + int32_t code = 0; STsdb *pTsdb = state->pTsdb; SRocksCache *rCache = &pTsdb->rCache; rocksdb_writebatch_t *wb = rCache->writebatch; char *rocks_value = NULL; size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); + code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); + if (code) { + tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } taosThreadMutexLock(&rCache->rMutex); @@ -561,25 +542,31 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); - code = -1; + code = TSDB_CODE_FAILED; } - return code; + TAOS_RETURN(code); } -static void reallocVarDataVal(SValue *pValue) { +static int32_t reallocVarDataVal(SValue *pValue) { if (IS_VAR_DATA_TYPE(pValue->type)) { uint8_t *pVal = pValue->pData; if (pValue->nData > 0) { - pValue->pData = taosMemoryMalloc(pValue->nData); + uint8_t *p = taosMemoryMalloc(pValue->nData); + if (!p) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + pValue->pData = p; memcpy(pValue->pData, pVal, pValue->nData); } else { pValue->pData = NULL; } } + + TAOS_RETURN(TSDB_CODE_SUCCESS); } -static void reallocVarData(SColVal *pColVal) { reallocVarDataVal(&pColVal->value); } +static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); } static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { SLastCol *pLastCol = (SLastCol *)value; @@ -614,6 +601,9 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i SLastCol *pLastCol = &noneCol; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (!pTmpLastCol) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -622,13 +612,13 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { SValue *pValue = &pLastCol->rowKey.pks[i]; if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); + TAOS_CHECK_RETURN(reallocVarDataVal(pValue)); charge += pValue->nData; } } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); + TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); charge += pLastCol->colVal.value.nData; } @@ -636,20 +626,10 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); if (status != TAOS_LRU_STATUS_OK) { - code = -1; + // code = -1; } - /* - // store result back to rocks cache - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); - SLastKey *key = pLastKey; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); - taosMemoryFree(value); - */ - return code; + TAOS_RETURN(code); } int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { @@ -668,21 +648,33 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); - code = -1; + code = TSDB_CODE_FAILED; } - return code; + TAOS_RETURN(code); } static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) { int32_t code = 0; // build keys & multi get from rocks - char **keys_list = taosMemoryCalloc(2, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); + char **keys_list = taosMemoryCalloc(2, sizeof(char *)); + if (!keys_list) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); + if (!keys_list_sizes) { + taosMemoryFree(keys_list); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } const size_t klen = ROCKS_KEY_LEN; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); + if (!keys) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; @@ -691,9 +683,30 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, keys_list_sizes[0] = klen; keys_list_sizes[1] = klen; - char **values_list = taosMemoryCalloc(2, sizeof(char *)); + char **values_list = taosMemoryCalloc(2, sizeof(char *)); + if (!values_list) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(keys_list[0]); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } size_t *values_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); - char **errs = taosMemoryCalloc(2, sizeof(char *)); + if (!values_list_sizes) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(keys_list[0]); + taosMemoryFree(values_list); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + char **errs = taosMemoryCalloc(2, sizeof(char *)); + if (!errs) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(keys_list[0]); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } // rocksMayWrite(pTsdb, true, false, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, 2, (const char *const *)keys_list, keys_list_sizes, @@ -708,13 +721,21 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], values_list_sizes[0]); + SLastCol *pLastCol = NULL; + code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[0], klen); } taosMemoryFreeClear(pLastCol); - pLastCol = tsdbCacheDeserialize(values_list[1], values_list_sizes[1]); + pLastCol = NULL; + code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[1], klen); } @@ -751,7 +772,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { @@ -772,8 +793,8 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); if (code != TSDB_CODE_SUCCESS) { taosThreadMutexUnlock(&pTsdb->lruMutex); - terrno = code; - return -1; + + TAOS_RETURN(code); } for (int i = 0; i < pTSchema->numOfCols; ++i) { @@ -789,7 +810,7 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap taosThreadMutexUnlock(&pTsdb->lruMutex); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { @@ -815,8 +836,9 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra STSchema *pTSchema = NULL; code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + taosThreadMutexUnlock(&pTsdb->lruMutex); + + TAOS_RETURN(code); } bool hasPrimayKey = false; @@ -838,7 +860,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra taosThreadMutexUnlock(&pTsdb->lruMutex); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { @@ -851,9 +873,11 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { STSchema *pTSchema = NULL; code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + taosThreadMutexUnlock(&pTsdb->lruMutex); + + TAOS_RETURN(code); } + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; @@ -877,7 +901,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { taosThreadMutexUnlock(&pTsdb->lruMutex); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { @@ -892,7 +916,7 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t taosThreadMutexUnlock(&pTsdb->lruMutex); //(void)tsdbCacheCommit(pTsdb); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) { @@ -908,7 +932,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h taosThreadMutexUnlock(&pTsdb->lruMutex); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { @@ -927,7 +951,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t taosThreadMutexUnlock(&pTsdb->lruMutex); //(void)tsdbCacheCommit(pTsdb); - return code; + TAOS_RETURN(code); } int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) { @@ -947,7 +971,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool taosThreadMutexUnlock(&pTsdb->lruMutex); - return code; + TAOS_RETURN(code); } typedef struct { @@ -968,6 +992,9 @@ static int32_t tsdbCacheUpdateValue(SValue *pOld, SValue *pNew) { if (IS_VAR_DATA_TYPE(pNew->type)) { if (nData < pNew->nData) { pOld->pData = taosMemoryCalloc(1, pNew->nData); + if (!pOld->pData) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } } else { pOld->pData = pFree; pFree = NULL; @@ -982,7 +1009,8 @@ static int32_t tsdbCacheUpdateValue(SValue *pOld, SValue *pNew) { } taosMemoryFreeClear(pFree); - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal *pColVal) { @@ -1008,10 +1036,10 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) { if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) { - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } - int32_t code = 0; + int32_t code = 0, lino = 0; int num_keys = TARRAY_SIZE(updCtxArray); SArray *remainCols = NULL; @@ -1051,17 +1079,35 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray num_keys = TARRAY_SIZE(remainCols); } if (remainCols && num_keys > 0) { - char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + char **keys_list = NULL; + size_t *keys_list_sizes = NULL; + char **values_list = NULL; + size_t *values_list_sizes = NULL; + char **errs = NULL; + keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); + keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + if (!keys_list || !keys_list_sizes) { + taosMemoryFree(keys_list); + taosThreadMutexUnlock(&pTsdb->lruMutex); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; keys_list[i] = (char *)&idxKey->key; keys_list_sizes[i] = ROCKS_KEY_LEN; } - char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); + values_list = taosMemoryCalloc(num_keys, sizeof(char *)); + values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + errs = taosMemoryCalloc(num_keys, sizeof(char *)); + if (!values_list || !values_list_sizes || !errs) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + taosThreadMutexUnlock(&pTsdb->lruMutex); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { @@ -1076,7 +1122,11 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray SRowKey *pRowKey = &updCtx->tsdbRowKey.key; SColVal *pColVal = &updCtx->colVal; - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); + SLastCol *pLastCol = NULL; + code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } SLastCol *PToFree = pLastCol; if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { @@ -1094,7 +1144,10 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray char *value = NULL; size_t vlen = 0; SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; - tsdbCacheSerialize(&lastColTmp, &value, &vlen); + code = tsdbCacheSerialize(&lastColTmp, &value, &vlen); + if (code) { + tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1104,6 +1157,16 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray pLastCol = &lastColTmp; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (!pTmpLastCol) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + taosArrayDestroy(remainCols); + taosThreadMutexUnlock(&pTsdb->lruMutex); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -1111,13 +1174,13 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { SValue *pValue = &pLastCol->rowKey.pks[i]; if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); + TAOS_CHECK_GOTO(reallocVarDataVal(pValue), &lino, _exit); charge += pValue->nData; } } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); + TAOS_CHECK_GOTO(reallocVarData(&pLastCol->colVal), &lino, _exit); charge += pLastCol->colVal.value.nData; } @@ -1144,15 +1207,19 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray taosArrayDestroy(remainCols); } +_exit: taosThreadMutexUnlock(&pTsdb->lruMutex); -_exit: - return code; + if (code) { + tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); + } + + TAOS_RETURN(code); } int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow) { - int32_t code = 0; + int32_t code = 0, lino = 0; // 1. prepare last TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; @@ -1162,11 +1229,7 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 SArray *ctxArray = NULL; SSHashObj *iColHash = NULL; - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - goto _exit; - } + TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit); TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; @@ -1227,7 +1290,8 @@ _exit: taosMemoryFreeClear(pTSchema); taosArrayDestroy(ctxArray); tSimpleHashCleanup(iColHash); - return code; + + TAOS_RETURN(code); } int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) { @@ -1239,11 +1303,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - goto _exit; - } + TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema)); ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx)); @@ -1299,7 +1359,8 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo _exit: taosMemoryFreeClear(pTSchema); taosArrayDestroy(ctxArray); - return 0; + + TAOS_RETURN(code); } static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, @@ -1324,18 +1385,30 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr int num_keys = TARRAY_SIZE(remainCols); int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); - int16_t *lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); - int16_t *lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); - int16_t *lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); - int16_t *lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); - SArray *lastTmpColArray = NULL; - SArray *lastTmpIndexArray = NULL; - SArray *lastrowTmpColArray = NULL; - SArray *lastrowTmpIndexArray = NULL; + int16_t *lastColIds = NULL; + int16_t *lastSlotIds = NULL; + int16_t *lastrowColIds = NULL; + int16_t *lastrowSlotIds = NULL; + lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + SArray *lastTmpColArray = NULL; + SArray *lastTmpIndexArray = NULL; + SArray *lastrowTmpColArray = NULL; + SArray *lastrowTmpIndexArray = NULL; int lastIndex = 0; int lastrowIndex = 0; + if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) { + taosMemoryFree(slotIds); + taosMemoryFree(lastColIds); + taosMemoryFree(lastSlotIds); + taosMemoryFree(lastrowColIds); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = taosArrayGet(remainCols, i); slotIds[i] = pr->pSlotIds[idxKey->idx]; @@ -1389,7 +1462,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; if (!pLastCol) { pLastCol = &noneCol; - reallocVarData(&pLastCol->colVal); + TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); } taosArraySet(pLastArray, idxKey->idx, pLastCol); @@ -1403,6 +1476,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (!pTmpLastCol) { + taosMemoryFree(slotIds); + taosMemoryFree(lastColIds); + taosMemoryFree(lastSlotIds); + taosMemoryFree(lastrowColIds); + taosMemoryFree(lastrowSlotIds); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -1410,26 +1491,29 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { SValue *pValue = &pLastCol->rowKey.pks[i]; if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); + TAOS_CHECK_RETURN(reallocVarDataVal(pValue)); charge += pValue->nData; } } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); + TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); charge += pLastCol->colVal.value.nData; } LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); if (status != TAOS_LRU_STATUS_OK) { - code = -1; + // code = -1; } // store result back to rocks cache wb = pTsdb->rCache.rwritebatch; char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); + code = tsdbCacheSerialize(pLastCol, &value, &vlen); + if (code) { + tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } SLastKey *key = &idxKey->key; size_t klen = ROCKS_KEY_LEN; @@ -1455,7 +1539,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr taosMemoryFree(slotIds); - return code; + TAOS_RETURN(code); } static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, @@ -1465,6 +1549,11 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); + if (!keys_list || !keys_list_sizes || !key_list) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } for (int i = 0; i < num_keys; ++i) { int16_t cid = *(int16_t *)taosArrayGet(remainCols, i); @@ -1476,6 +1565,13 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); + if (!values_list || !values_list_sizes || !errs) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { @@ -1485,14 +1581,26 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA rocksdb_free(errs[i]); } } + taosMemoryFree(errs); SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); + SLastCol *pLastCol = NULL; + code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } SLastCol *PToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; if (pLastCol) { SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (!pTmpLastCol) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -1500,12 +1608,12 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { SValue *pValue = &pLastCol->rowKey.pks[i]; if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); + TAOS_CHECK_RETURN(reallocVarDataVal(pValue)); charge += pValue->nData; } } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); + TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); charge += pLastCol->colVal.value.nData; } @@ -1517,9 +1625,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA SLastCol lastCol = *pLastCol; for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { - reallocVarDataVal(&lastCol.rowKey.pks[i]); + TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[i])); } - reallocVarData(&lastCol.colVal); + TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, j); @@ -1530,7 +1638,6 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA } } - taosMemoryFree(errs); taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); @@ -1542,7 +1649,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype); } - return code; + TAOS_RETURN(code); } int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { @@ -1572,9 +1679,9 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol lastCol = *pLastCol; for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { - reallocVarDataVal(&lastCol.rowKey.pks[j]); + TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j])); } - reallocVarData(&lastCol.colVal); + TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); taosArrayPush(pLastArray, &lastCol); taosLRUCacheRelease(pCache, h, false); @@ -1601,9 +1708,17 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol lastCol = *pLastCol; for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { - reallocVarDataVal(&lastCol.rowKey.pks[j]); + code = reallocVarDataVal(&lastCol.rowKey.pks[j]); + if (code) { + taosThreadMutexUnlock(&pTsdb->lruMutex); + TAOS_RETURN(code); + } + } + code = reallocVarData(&lastCol.colVal); + if (code) { + taosThreadMutexUnlock(&pTsdb->lruMutex); + TAOS_RETURN(code); } - reallocVarData(&lastCol.colVal); taosArraySet(pLastArray, idxKey->idx, &lastCol); taosLRUCacheRelease(pCache, h, false); @@ -1624,7 +1739,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } } - return code; + TAOS_RETURN(code); } int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { @@ -1632,22 +1747,28 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE // fetch schema STSchema *pTSchema = NULL; int sver = -1; - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } + + TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema)); // build keys & multi get from rocks - int num_keys = pTSchema->numOfCols; - char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + int num_keys = pTSchema->numOfCols; + char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + if (!keys_list || !keys_list_sizes) { + taosMemoryFree(keys_list); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } const size_t klen = ROCKS_KEY_LEN; for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); + if (!keys) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; @@ -1659,6 +1780,15 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + if (!values_list_sizes || !values_list) { + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + for (int i = 0; i < num_keys; ++i) { + taosMemoryFree(keys_list[i]); + } + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } (void)tsdbCacheCommit(pTsdb); @@ -1679,14 +1809,22 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); + SLastCol *pLastCol = NULL; + code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } taosThreadMutexLock(&pTsdb->rCache.rMutex); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } taosMemoryFreeClear(pLastCol); - pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys]); + pLastCol = NULL; + code = tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys], &pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + } if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } @@ -1746,45 +1884,23 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE _exit: taosMemoryFree(pTSchema); - return code; + TAOS_RETURN(code); } int32_t tsdbOpenCache(STsdb *pTsdb) { - int32_t code = 0; - SLRUCache *pCache = NULL; - size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; + int32_t code = 0, lino = 0; + size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; - pCache = taosLRUCacheInit(cfgCapacity, 0, .5); + SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5); if (pCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } -#if 0 - code = tsdbOpenBICache(pTsdb); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } -#endif + TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err); - code = tsdbOpenBCache(pTsdb); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err); - code = tsdbOpenPgCache(pTsdb); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - code = tsdbOpenRocksCache(pTsdb); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err); taosLRUCacheSetStrictCapacity(pCache, false); @@ -1794,8 +1910,13 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { pTsdb->flushState.flush_count = 0; _err: + if (code) { + tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); + } + pTsdb->lruCache = pCache; - return code; + + TAOS_RETURN(code); } void tsdbCloseCache(STsdb *pTsdb) { @@ -1808,9 +1929,6 @@ void tsdbCloseCache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->lruMutex); } -#if 0 - tsdbCloseBICache(pTsdb); -#endif tsdbCloseBCache(pTsdb); tsdbClosePgCache(pTsdb); tsdbCloseRocksCache(pTsdb); @@ -1826,320 +1944,6 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { *len = sizeof(uint64_t); } -#ifdef BUILD_NO_CALL -static void deleteTableCacheLast(const void *key, size_t keyLen, void *value, void *ud) { - (void)ud; - SArray *pLastArray = (SArray *)value; - int16_t nCol = taosArrayGetSize(pLastArray); - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { - taosMemoryFree(pLastCol->colVal.value.pData); - } - } - - taosArrayDestroy(value); -} - -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { - int32_t code = 0; - - char key[32] = {0}; - int keyLen = 0; - - // getTableCacheKey(uid, "lr", key, &keyLen); - getTableCacheKey(uid, 0, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - taosLRUCacheRelease(pCache, h, invalidate); - if (invalidate) { - taosLRUCacheErase(pCache, key, keyLen); - } - } - - return code; -} - -int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { - int32_t code = 0; - - char key[32] = {0}; - int keyLen = 0; - - // getTableCacheKey(uid, "l", key, &keyLen); - getTableCacheKey(uid, 1, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - taosLRUCacheRelease(pCache, h, invalidate); - if (invalidate) { - taosLRUCacheErase(pCache, key, keyLen); - } - } - - return code; -} - -int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { - int32_t code = 0; - STSRow *cacheRow = NULL; - char key[32] = {0}; - int keyLen = 0; - - // getTableCacheKey(uid, "lr", key, &keyLen); - getTableCacheKey(uid, 0, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); - TSKEY keyTs = TSDBROW_TS(row); - bool invalidate = false; - - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - int16_t nCol = taosArrayGetSize(pLast); - int16_t iCol = 0; - - if (nCol <= 0) { - nCol = pTSchema->numOfCols; - - STColumn *pTColumn = &pTSchema->columns[0]; - SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs}); - if (taosArrayPush(pLast, &(SLastCol){.ts = keyTs, .colVal = tColVal}) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - - for (iCol = 1; iCol < nCol; ++iCol) { - SColVal colVal = {0}; - tsdbRowGetColVal(row, pTSchema, iCol, &colVal); - - SLastCol lastCol = {.ts = keyTs, .colVal = colVal}; - if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) { - lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData); - } - - if (taosArrayPush(pLast, &lastCol) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - } - - goto _invalidate; - } - - if (nCol != pTSchema->numOfCols) { - invalidate = true; - goto _invalidate; - } - - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (keyTs > tTsVal->ts) { - STColumn *pTColumn = &pTSchema->columns[0]; - SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs}); - - taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal}); - } - - for (++iCol; iCol < nCol; ++iCol) { - SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol); - if (keyTs >= tTsVal1->ts) { - SColVal *tColVal = &tTsVal1->colVal; - - SColVal colVal = {0}; - tsdbRowGetColVal(row, pTSchema, iCol, &colVal); - - if (colVal.cid != tColVal->cid) { - invalidate = true; - goto _invalidate; - } - - if (!COL_VAL_IS_NONE(&colVal)) { - if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) { - invalidate = true; - - break; - } else { // new inserting key is greater than cached, update cached entry - SLastCol lastCol = {.ts = keyTs, .colVal = colVal}; - if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol); - if (pLastCol->colVal.value.nData > 0 && NULL != pLastCol->colVal.value.pData) - taosMemoryFree(pLastCol->colVal.value.pData); - - lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData); - } - - taosArraySet(pLast, iCol, &lastCol); - } - } - } - } - - _invalidate: - taosMemoryFreeClear(pTSchema); - - taosLRUCacheRelease(pCache, h, invalidate); - if (invalidate) { - taosLRUCacheErase(pCache, key, keyLen); - } - } - - return code; -} - -int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb) { - int32_t code = 0; - STSRow *cacheRow = NULL; - char key[32] = {0}; - int keyLen = 0; - - // getTableCacheKey(uid, "l", key, &keyLen); - getTableCacheKey(uid, 1, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); - TSKEY keyTs = TSDBROW_TS(row); - bool invalidate = false; - - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - int16_t nCol = taosArrayGetSize(pLast); - int16_t iCol = 0; - - if (nCol <= 0) { - nCol = pTSchema->numOfCols; - - STColumn *pTColumn = &pTSchema->columns[0]; - SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs}); - if (taosArrayPush(pLast, &(SLastCol){.ts = keyTs, .colVal = tColVal}) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - - for (iCol = 1; iCol < nCol; ++iCol) { - SColVal colVal = {0}; - tsdbRowGetColVal(row, pTSchema, iCol, &colVal); - - SLastCol lastCol = {.ts = keyTs, .colVal = colVal}; - if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) { - lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData); - } - - if (taosArrayPush(pLast, &lastCol) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - } - - goto _invalidate; - } - - if (nCol != pTSchema->numOfCols) { - invalidate = true; - goto _invalidate; - } - - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (keyTs > tTsVal->ts) { - STColumn *pTColumn = &pTSchema->columns[0]; - SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs}); - - taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal}); - } - - for (++iCol; iCol < nCol; ++iCol) { - SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol); - if (keyTs >= tTsVal1->ts) { - SColVal *tColVal = &tTsVal1->colVal; - - SColVal colVal = {0}; - tsdbRowGetColVal(row, pTSchema, iCol, &colVal); - - if (colVal.cid != tColVal->cid) { - invalidate = true; - goto _invalidate; - } - - if (COL_VAL_IS_VALUE(&colVal)) { - if (keyTs == tTsVal1->ts && COL_VAL_IS_VALUE(tColVal)) { - invalidate = true; - - break; - } else { - SLastCol lastCol = {.ts = keyTs, .colVal = colVal}; - if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol); - if (pLastCol->colVal.value.nData > 0 && NULL != pLastCol->colVal.value.pData) - taosMemoryFree(pLastCol->colVal.value.pData); - - lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _invalidate; - } - memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData); - } - - taosArraySet(pLast, iCol, &lastCol); - } - } - } - } - - _invalidate: - taosMemoryFreeClear(pTSchema); - - taosLRUCacheRelease(pCache, h, invalidate); - if (invalidate) { - taosLRUCacheErase(pCache, key, keyLen); - } - } - - return code; -} -#endif - static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { tb_uid_t suid = 0; @@ -2170,7 +1974,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX); } - return code; + TAOS_RETURN(code); } static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { @@ -2184,53 +1988,6 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { return code; } -#ifdef BUILD_NO_CALL -static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, - SArray *aDelData) { - int32_t code = 0; - - if (pDelIdx) { - code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData); - if (code) goto _err; - } - - if (pMem) { - code = getTableDelDataFromTbData(pMem, aDelData); - if (code) goto _err; - } - - if (pIMem) { - code = getTableDelDataFromTbData(pIMem, aDelData); - if (code) goto _err; - } - -_err: - return code; -} - -static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, - SArray *aSkyline) { - int32_t code = 0; - SArray *aDelData = NULL; - - aDelData = taosArrayInit(32, sizeof(SDelData)); - code = getTableDelData(pMem, pIMem, pDelReader, pDelIdx, aDelData); - if (code) goto _err; - - size_t nDelData = taosArrayGetSize(aDelData); - if (nDelData > 0) { - code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline); - if (code) goto _err; - } - -_err: - if (aDelData) { - taosArrayDestroy(aDelData); - } - return code; -} -#endif - static void freeTableInfoFunc(void *param) { void **p = (void **)param; taosMemoryFreeClear(*p); @@ -2247,7 +2004,9 @@ static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); if (!ppInfo) { pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); - tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); + if (pInfo) { + tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); + } return pInfo; } @@ -2260,6 +2019,9 @@ static uint64_t *getUidList(SCacheRowsReader *pReader) { int32_t numOfTables = pReader->numOfTables; pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t)); + if (!pReader->uidList) { + return NULL; + } for (int32_t i = 0; i < numOfTables; ++i) { uint64_t uid = pReader->pTableList[i].uid; @@ -2275,9 +2037,13 @@ static uint64_t *getUidList(SCacheRowsReader *pReader) { static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader, bool isFile) { int32_t code = 0; - uint64_t *uidList = getUidList(pReader); int32_t numOfTables = pReader->numOfTables; int64_t suid = pReader->info.suid; + uint64_t *uidList = getUidList(pReader); + + if (!uidList) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) { STombBlk *pTombBlk = &pTombBlkArray->data[i]; @@ -2294,7 +2060,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block) : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block); if (code != TSDB_CODE_SUCCESS) { - return code; + TAOS_RETURN(code); } uint64_t uid = uidList[j]; @@ -2358,37 +2124,28 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea tTombBlockDestroy(&block); if (finished) { - return code; + TAOS_RETURN(code); } } - return TSDB_CODE_SUCCESS; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) { - int32_t code = 0; - const TTombBlkArray *pBlkArray = NULL; - code = tsdbDataFileReadTombBlk(pFileReader, &pBlkArray); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - return loadTombFromBlk(pBlkArray, pReader, pFileReader, true); + TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray)); + + TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true)); } static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) { - int32_t code = 0; - - SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader; - + SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader; const TTombBlkArray *pBlkArray = NULL; - code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - return loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false); + TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray)); + + TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false)); } typedef struct { @@ -2421,14 +2178,11 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb .pCurRowKey = &pr->rowKey, }; - code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL); - if (code != TSDB_CODE_SUCCESS) { - return -1; - } + TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL)); iter->pMergeTree = &iter->mergeTree; - return code; + TAOS_RETURN(code); } static int32_t lastIterClose(SFSLastIter **iter) { @@ -2441,7 +2195,7 @@ static int32_t lastIterClose(SFSLastIter **iter) { *iter = NULL; - return code; + TAOS_RETURN(code); } static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) { @@ -2450,12 +2204,13 @@ static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) { bool hasVal = tMergeTreeNext(iter->pMergeTree); if (!hasVal) { *ppRow = NULL; - return code; + + TAOS_RETURN(code); } *ppRow = tMergeTreeGetRow(iter->pMergeTree); - return code; + TAOS_RETURN(code); } typedef enum SFSNEXTROWSTATES { @@ -2506,8 +2261,8 @@ static void clearLastFileSet(SFSNextRowIter *state); static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, int nCols) { + int32_t code = 0, lino = 0; SFSNextRowIter *state = (SFSNextRowIter *)iter; - int32_t code = 0; STsdb *pTsdb = state->pr->pTsdb; if (SFSNEXTROW_FS == state->state) { @@ -2522,7 +2277,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (--state->iFileSet < 0) { *ppRow = NULL; - return code; + + TAOS_RETURN(code); } else { state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet); } @@ -2552,19 +2308,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie filesName[3] = pFileObj[3]->fname; } - code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err); state->pr->pCurFileSet = state->pFileSet; loadDataTomb(state->pr, state->pr->pFileReader); - int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err); } if (!state->pIndexList) { @@ -2601,16 +2351,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie state->pr->pCurFileSet = state->pFileSet; } - code = lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid, state->pr, - state->lastTs, aCols, nCols); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid, + state->pr, state->lastTs, aCols, nCols), + &lino, _err); - code = lastIterNext(&state->lastIter, &state->pLastRow); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err); if (!state->pLastRow) { state->lastEmpty = 1; @@ -2627,7 +2372,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie *ppRow = state->pLastRow; state->pLastRow = NULL; - return code; + + TAOS_RETURN(code); } } @@ -2635,10 +2381,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie } if (SFSNEXTROW_NEXTSTTROW == state->state) { - code = lastIterNext(&state->lastIter, &state->pLastRow); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err); if (!state->pLastRow) { if (state->pLastIter) { @@ -2651,7 +2394,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie } else { *ppRow = state->pLastRow; state->pLastRow = NULL; - return code; + + TAOS_RETURN(code); } } @@ -2677,10 +2421,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie } else { tBrinBlockClear(&state->brinBlock); } - code = tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + + TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err); state->iBrinRecord = state->brinBlock.numOfRecords - 1; state->state = SFSNEXTROW_BRINBLOCK; @@ -2692,10 +2434,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tBrinBlockClear(&state->brinBlock); goto _next_brinindex; } - code = tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + + TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err); SBrinRecord *pRecord = &state->brinRecord; if (pRecord->uid != state->uid) { @@ -2712,8 +2452,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (!state->pBlockData) { state->pBlockData = &state->blockData; - code = tBlockDataCreate(&state->blockData); - if (code) goto _err; + + TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err); } else { tBlockDataReset(state->pBlockData); } @@ -2722,11 +2462,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie --nCols; ++aCols; } - code = tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols, - nCols); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + + TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData, + state->pTSchema, aCols, nCols), + &lino, _err); state->nRow = state->blockData.nRow; state->iRow = state->nRow - 1; @@ -2749,10 +2488,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (!state->pLastRow) { // get next row from fslast and process with fs row, --state->Row if select fs row - code = lastIterNext(&state->lastIter, &state->pLastRow); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err); } if (!state->pLastRow) { @@ -2771,34 +2507,27 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie if (lastRowTs > rowTs) { *ppRow = state->pLastRow; state->pLastRow = NULL; - return code; + + TAOS_RETURN(code); } else if (lastRowTs < rowTs) { *ppRow = &state->row; --state->iRow; - return code; + + TAOS_RETURN(code); } else { // TODO: merge rows and *ppRow = mergedRow SRowMerger *pMerger = &state->rowMerger; tsdbRowMergerInit(pMerger, state->pTSchema); - code = tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - code = tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err); + TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err); if (state->pTSRow) { taosMemoryFree(state->pTSRow); state->pTSRow = NULL; } - code = tsdbRowMergerGetRow(pMerger, &state->pTSRow); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err); state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow); *ppRow = &state->row; @@ -2806,7 +2535,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tsdbRowMergerClear(pMerger); - return code; + TAOS_RETURN(code); } } @@ -2815,7 +2544,12 @@ _err: *ppRow = NULL; - return code; + if (code) { + tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + + TAOS_RETURN(code); } typedef enum SMEMNEXTROWSTATES { @@ -2841,7 +2575,8 @@ static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarli if (state->pMem->maxKey <= state->lastTs) { *ppRow = NULL; *pIgnoreEarlierTs = true; - return code; + + TAOS_RETURN(code); } tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); @@ -2850,23 +2585,23 @@ static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarli *ppRow = pMemRow; state->state = SMEMNEXTROW_NEXT; - return code; + TAOS_RETURN(code); } } *ppRow = NULL; - return code; + TAOS_RETURN(code); } case SMEMNEXTROW_NEXT: if (tsdbTbDataIterNext(&state->iter)) { *ppRow = tsdbTbDataIterGet(&state->iter); - return code; + TAOS_RETURN(code); } else { *ppRow = NULL; - return code; + TAOS_RETURN(code); } default: ASSERT(0); @@ -2875,7 +2610,8 @@ static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarli _err: *ppRow = NULL; - return code; + + TAOS_RETURN(code); } static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { @@ -2942,7 +2678,7 @@ int32_t clearNextRowFromFS(void *iter) { SFSNextRowIter *state = (SFSNextRowIter *)iter; if (!state) { - return code; + TAOS_RETURN(code); } if (state->pLastIter) { @@ -2974,7 +2710,7 @@ int32_t clearNextRowFromFS(void *iter) { state->pRowIter->pSkyline = NULL; } - return code; + TAOS_RETURN(code); } static void clearLastFileSet(SFSNextRowIter *state) { @@ -3016,7 +2752,7 @@ static void clearLastFileSet(SFSNextRowIter *state) { static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs, SCacheRowsReader *pr) { - int code = 0; + int32_t code = 0, lino = 0; STbData *pMem = NULL; if (pReadSnap->pMem) { @@ -3032,10 +2768,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->pMemDelData = NULL; - code = loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err); pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; @@ -3071,12 +2804,13 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs } pIter->pr = pr; + _err: - return code; + TAOS_RETURN(code); } static int32_t nextRowIterClose(CacheNextRowIter *pIter) { - int code = 0; + int32_t code = 0; for (int i = 0; i < 3; ++i) { if (pIter->input[i].nextRowClearFn) { @@ -3093,19 +2827,20 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { } _err: - return code; + TAOS_RETURN(code); } // iterate next row non deleted backward ts, version (from high to low) static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, int nCols) { - int code = 0; + int32_t code = 0, lino = 0; + for (;;) { for (int i = 0; i < 3; ++i) { if (pIter->input[i].next && !pIter->input[i].stop) { - code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs, - isLast, aCols, nCols); - if (code) goto _err; + TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, + &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols), + &lino, _err); if (pIter->input[i].pRow == NULL) { pIter->input[i].stop = true; @@ -3118,7 +2853,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI *ppRow = NULL; *pIgnoreEarlierTs = (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs); - return code; + + TAOS_RETURN(code); } // select maxpoint(s) from mem, imem, fs and last @@ -3184,34 +2920,24 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI pIter->input[iMerge[0]].next = true; *ppRow = merge[0]; - return code; + + TAOS_RETURN(code); } } _err: - return code; -} -#ifdef BUILD_NO_CALL -static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) { - SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol)); - if (NULL == pColArray) { - return TSDB_CODE_OUT_OF_MEMORY; + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); } - for (int32_t i = 0; i < pTSchema->numOfCols; ++i) { - SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)}; - taosArrayPush(pColArray, &col); - } - *ppColArray = pColArray; - return TSDB_CODE_SUCCESS; + TAOS_RETURN(code); } -#endif static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) { SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol)); if (NULL == pColArray) { - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } for (int32_t i = 0; i < nCols; ++i) { @@ -3221,34 +2947,38 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, taosArrayPush(pColArray, &col); } *ppColArray = pColArray; - return TSDB_CODE_SUCCESS; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; *ppDst = taosMemoryMalloc(len); if (NULL == *ppDst) { - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } memcpy(*ppDst, pSrc, len); - return TSDB_CODE_SUCCESS; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { - return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema); + TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); } if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { - return TSDB_CODE_SUCCESS; + TAOS_RETURN(TSDB_CODE_SUCCESS); } taosMemoryFreeClear(pReader->pCurrSchema); - return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema); + TAOS_RETURN( + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); } static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds) { + int32_t code = 0, lino = 0; STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); int16_t nLastCol = nCols; int16_t noneCol = 0; @@ -3258,15 +2988,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC SArray *pColArray = NULL; SColVal *pColVal = &(SColVal){0}; - int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols)); + SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); if (NULL == aColArray) { taosArrayDestroy(pColArray); - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } for (int i = 0; i < nCols; ++i) { @@ -3291,10 +3019,8 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC int32_t sversion = TSDBROW_SVERSION(pRow); if (sversion != -1) { - code = updateTSchema(sversion, pr, uid); - if (TSDB_CODE_SUCCESS != code) { - goto _err; - } + TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err); + pTSchema = pr->pCurrSchema; } // int16_t nCol = pTSchema->numOfCols; @@ -3302,7 +3028,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC STsdbRowKey rowKey = {0}; tsdbRowGetKey(pRow, &rowKey); - if (lastRowKey.key.ts == TSKEY_MAX) { // first time + if (lastRowKey.key.ts == TSKEY_MAX) { // first time lastRowKey = rowKey; for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { @@ -3334,9 +3060,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); if (pCol->colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } else { @@ -3388,9 +3112,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC if (pColVal->value.nData > 0) { lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } else { @@ -3423,7 +3145,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC nextRowIterClose(&iter); taosArrayDestroy(aColArray); - return code; + TAOS_RETURN(code); _err: nextRowIterClose(&iter); @@ -3431,11 +3153,18 @@ _err: *ppLastArray = NULL; taosArrayDestroy(pColArray); taosArrayDestroy(aColArray); - return code; + + if (code) { + tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + + TAOS_RETURN(code); } static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds) { + int32_t code = 0, lino = 0; STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); int16_t nLastCol = nCols; int16_t noneCol = 0; @@ -3445,15 +3174,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SArray *pColArray = NULL; SColVal *pColVal = &(SColVal){0}; - int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols)); + SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); if (NULL == aColArray) { taosArrayDestroy(pColArray); - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } for (int i = 0; i < nCols; ++i) { @@ -3476,10 +3203,8 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, int32_t sversion = TSDBROW_SVERSION(pRow); if (sversion != -1) { - code = updateTSchema(sversion, pr, uid); - if (TSDB_CODE_SUCCESS != code) { - goto _err; - } + TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err); + pTSchema = pr->pCurrSchema; } // int16_t nCol = pTSchema->numOfCols; @@ -3512,9 +3237,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); if (pCol->colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } if (pColVal->value.nData > 0) { memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); @@ -3546,7 +3269,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, nextRowIterClose(&iter); taosArrayDestroy(aColArray); - return code; + TAOS_RETURN(code); _err: nextRowIterClose(&iter); @@ -3554,7 +3277,13 @@ _err: *ppLastArray = NULL; taosArrayDestroy(pColArray); taosArrayDestroy(aColArray); - return code; + + if (code) { + tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + + TAOS_RETURN(code); } int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { @@ -3591,93 +3320,6 @@ int32_t tsdbCacheGetElems(SVnode *pVnode) { return elems; } -#if 0 -static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) { - struct { - int32_t fid; - int64_t commitID; - } biKey = {0}; - - biKey.fid = fid; - biKey.commitID = commitID; - - *len = sizeof(biKey); - memcpy(key, &biKey, *len); -} - -static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockIdx) { - SArray *pArray = taosArrayInit(8, sizeof(SBlockIdx)); - int32_t code = tsdbReadBlockIdx(pFileReader, pArray); - - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pArray); - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } - - *aBlockIdx = pArray; - - return code; -} - -static void deleteBICache(const void *key, size_t keyLen, void *value, void *ud) { - (void)ud; - SArray *pArray = (SArray *)value; - - taosArrayDestroy(pArray); -} - -int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle) { - int32_t code = 0; - char key[128] = {0}; - int keyLen = 0; - - getBICacheKey(pFileReader->pSet->fid, pFileReader->pSet->pHeadF->commitID, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (!h) { - STsdb *pTsdb = pFileReader->pTsdb; - taosThreadMutexLock(&pTsdb->biMutex); - - h = taosLRUCacheLookup(pCache, key, keyLen); - if (!h) { - SArray *pArray = NULL; - code = tsdbCacheLoadBlockIdx(pFileReader, &pArray); - // if table's empty or error, return code of -1 - if (code != TSDB_CODE_SUCCESS || pArray == NULL) { - taosThreadMutexUnlock(&pTsdb->biMutex); - - *handle = NULL; - return 0; - } - - size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray); - _taos_lru_deleter_t deleter = deleteBICache; - LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - } - - taosThreadMutexUnlock(&pTsdb->biMutex); - } - - tsdbTrace("bi cache:%p, ref", pCache); - *handle = h; - - return code; -} - -int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { - int32_t code = 0; - - taosLRUCacheRelease(pCache, h, false); - tsdbTrace("bi cache:%p, release", pCache); - - return code; -} -#endif - // block cache static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) { struct { @@ -3696,22 +3338,10 @@ static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { int32_t code = 0; - /* - uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage); - if (pBlock == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - */ - int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock); - if (code != TSDB_CODE_SUCCESS) { - // taosMemoryFree(pBlock); - // code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } - //*ppBlock = pBlock; + int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; + + TAOS_CHECK_RETURN(s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock)); tsdbTrace("block:%p load from s3", *ppBlock); @@ -3749,7 +3379,8 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) if (code == TSDB_CODE_SUCCESS && !pBlock) { code = TSDB_CODE_OUT_OF_MEMORY; } - return code; + + TAOS_RETURN(code); } size_t charge = tsS3BlockSize * pFD->szPage; @@ -3757,7 +3388,7 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); if (status != TAOS_LRU_STATUS_OK) { - code = -1; + // code = -1; } } @@ -3766,7 +3397,7 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) *handle = h; - return code; + TAOS_RETURN(code); } int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) { @@ -3793,6 +3424,9 @@ int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_ size_t charge = pFD->szPage; _taos_lru_deleter_t deleter = deleteBCache; uint8_t *pPg = taosMemoryMalloc(charge); + if (!pPg) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } memcpy(pPg, pPage, charge); LRUStatus status = @@ -3806,5 +3440,5 @@ int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_ tsdbCacheRelease(pFD->pTsdb->pgCache, handle); - return code; + TAOS_RETURN(code); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 19c8837d83..57c70d8df1 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -51,7 +51,12 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { taosLRUCacheSetStrictCapacity(pLogStore->pCache, false); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); - ASSERT(pLogStore->data != NULL); + if (!pLogStore->data) { + taosMemoryFree(pLogStore); + taosLRUCacheCleanup(pLogStore->pCache); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; @@ -60,7 +65,13 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { taosThreadMutexInit(&(pData->mutex), NULL); pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); - ASSERT(pData->pWalHandle != NULL); + if (!pData->pWalHandle) { + taosMemoryFree(pLogStore); + taosLRUCacheCleanup(pLogStore->pCache); + taosThreadMutexDestroy(&(pData->mutex)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; pLogStore->syncLogCommitIndex = raftlogCommitIndex; @@ -110,7 +121,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI SWal* pWal = pData->pWal; int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); @@ -118,10 +129,10 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI sNError(pData->pSyncNode, "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, err, errStr, sysErr, sysErrStr); - return -1; + TAOS_RETURN(err); } - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { @@ -224,7 +235,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pEntry->index, err, errStr, sysErr, sysErrStr); - return -1; + + TAOS_RETURN(err); } code = walFsync(pWal, forceSync); @@ -235,7 +247,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } // entry found, return 0 @@ -253,10 +265,10 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR SWalReader* pWalHandle = pData->pWalHandle; if (pWalHandle == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId); taosThreadMutexUnlock(&(pData->mutex)); - return -1; + + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } int64_t ts2 = taosGetTimestampNs(); @@ -266,7 +278,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR // code = walReadVerCached(pWalHandle, index); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); @@ -286,7 +298,8 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR */ taosThreadMutexUnlock(&(pData->mutex)); - return code; + + TAOS_RETURN(code); } *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); @@ -319,7 +332,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR ", elapsed-build:%" PRId64, index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); - return code; + TAOS_RETURN(code); } // truncate semantic @@ -329,7 +342,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn int32_t code = walRollback(pWal, fromIndex); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); @@ -339,7 +352,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn // event log sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex); - return code; + + TAOS_RETURN(code); } // entry found, return 0 @@ -352,16 +366,16 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp *ppLastEntry = NULL; if (walIsEmpty(pWal)) { - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } else { SyncIndex lastIndex = raftLogLastIndex(pLogStore); ASSERT(lastIndex >= SYNC_INDEX_BEGIN); int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); - return code; + + TAOS_RETURN(code); } - return -1; + TAOS_RETURN(TSDB_CODE_FAILED); } int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { @@ -375,20 +389,22 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { if (index < snapshotVer || index > wallastVer) { // ignore - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t code = walCommit(pWal, index); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr); - return -1; + + TAOS_RETURN(code); } - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) { @@ -405,5 +421,6 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; + return walGetCommittedVer(pWal); } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index c200c6cb4b..7f702c3766 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -25,17 +25,17 @@ static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) { int32_t code = 0; tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code); - if (code < 0) return -1; + if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED); tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code); - if (code < 0) return -1; + if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED); tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code); - if (code < 0) return -1; + if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t raftStoreReadFile(SSyncNode *pNode) { - int32_t code = -1; + int32_t code = -1, lino = 0; TdFilePtr pFile = NULL; char *pData = NULL; SJson *pJson = NULL; @@ -52,41 +52,38 @@ int32_t raftStoreReadFile(SSyncNode *pNode) { pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr()); - goto _OVER; + + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); } int64_t size = 0; if (taosFStatFile(pFile, &size, NULL) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr()); - goto _OVER; + + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); } pData = taosMemoryMalloc(size + 1); if (pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER); } if (taosReadFile(pFile, pData, size) != size) { - terrno = TAOS_SYSTEM_ERROR(errno); sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr()); - goto _OVER; + + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); } pData[size] = '\0'; pJson = tjsonParse(pData); if (pJson == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; - goto _OVER; + TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER); } if (raftStoreDecode(pJson, pStore) < 0) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; - goto _OVER; + TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER); } code = 0; @@ -100,18 +97,20 @@ _OVER: if (code != 0) { sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr()); } - return code; + + TAOS_RETURN(code); } static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) { - if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1; - return 0; + if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) TAOS_RETURN(TSDB_CODE_FAILED); + if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) TAOS_RETURN(TSDB_CODE_FAILED); + if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) TAOS_RETURN(TSDB_CODE_FAILED); + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t raftStoreWriteFile(SSyncNode *pNode) { - int32_t code = -1; + int32_t code = -1, lino = 0; char *buffer = NULL; SJson *pJson = NULL; TdFilePtr pFile = NULL; @@ -120,23 +119,23 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s.bak", realfile); - terrno = TSDB_CODE_OUT_OF_MEMORY; pJson = tjsonCreateObject(); - if (pJson == NULL) goto _OVER; - if (raftStoreEncode(pJson, pStore) != 0) goto _OVER; + if (pJson == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER); + if (raftStoreEncode(pJson, pStore) != 0) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER); + buffer = tjsonToString(pJson); - if (buffer == NULL) goto _OVER; - terrno = 0; + if (buffer == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER); pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pFile == NULL) goto _OVER; + if (pFile == NULL) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); int32_t len = strlen(buffer); - if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; - if (taosFsyncFile(pFile) < 0) goto _OVER; + if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); + + if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); taosCloseFile(&pFile); - if (taosRenameFile(file, realfile) != 0) goto _OVER; + if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); code = 0; sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm); @@ -147,7 +146,6 @@ _OVER: if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { - if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr()); } return code; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 9312719be7..dfe9f51af2 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -52,7 +52,8 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); syncLogReplReset(pMgr); taosThreadMutexUnlock(&pBuf->mutex); - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t syncNodeReplicate(SSyncNode* pNode) { @@ -60,13 +61,14 @@ int32_t syncNodeReplicate(SSyncNode* pNode) { taosThreadMutexLock(&pBuf->mutex); int32_t ret = syncNodeReplicateWithoutLock(pNode); taosThreadMutexUnlock(&pBuf->mutex); - return ret; + + TAOS_RETURN(ret); } int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) || pNode->raftCfg.cfg.totalReplicaNum == 1) { - return -1; + TAOS_RETURN(TSDB_CODE_FAILED); } for (int32_t i = 0; i < pNode->totalReplicaNum; i++) { if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) { @@ -75,14 +77,16 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; (void)syncLogReplStart(pMgr, pNode); } - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; pMsg->destId = *destRaftId; syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { @@ -112,5 +116,5 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 77e5498127..0e50cca94c 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -20,7 +20,6 @@ #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "syncUtil.h" // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == @@ -95,7 +94,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) { syncLogRecvRequestVote(ths, pMsg, -1, "not in my config"); - return -1; + + TAOS_RETURN(TSDB_CODE_FAILED); } bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); @@ -122,8 +122,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // send msg SRpcMsg rpcMsg = {0}; - ret = syncBuildRequestVoteReply(&rpcMsg, ths->vgId); - ASSERT(ret == 0); + + TAOS_CHECK_RETURN(syncBuildRequestVoteReply(&rpcMsg, ths->vgId)); SyncRequestVoteReply* pReply = rpcMsg.pCont; pReply->srcId = ths->myRaftId; @@ -138,5 +138,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); if (resetElect) syncNodeResetElectTimer(ths); - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 25c9f813a6..10d9a6c96b 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -45,19 +45,22 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { syncLogRecvRequestVoteReply(ths, pMsg, "not in my config"); - return -1; + + TAOS_RETURN(TSDB_CODE_FAILED); } SyncTerm currentTerm = raftStoreGetTerm(ths); // drop stale response if (pMsg->term < currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); - return -1; + + TAOS_RETURN(TSDB_CODE_FAILED); } if (pMsg->term > currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); - return -1; + + TAOS_RETURN(TSDB_CODE_FAILED); } syncLogRecvRequestVoteReply(ths, pMsg, ""); @@ -69,7 +72,8 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (ths->pVotesRespond->term != pMsg->term) { sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "", ths->pVotesRespond->term, pMsg->term); - return -1; + + TAOS_RETURN(TSDB_CODE_FAILED); } votesRespondAdd(ths->pVotesRespond, pMsg); @@ -93,5 +97,5 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } } - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 3de159797f..4f8e96c4c7 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -20,6 +20,7 @@ #include "tarray.h" #include "tdef.h" #include "tlog.h" +#include "tutil.h" typedef struct SLRUEntry SLRUEntry; typedef struct SLRUEntryTable SLRUEntryTable; @@ -114,13 +115,13 @@ static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { table->lengthBits = 4; table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); if (!table->list) { - return -1; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } table->elems = 0; table->maxLengthBits = maxUpperHashBits; - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) { @@ -349,9 +350,7 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity) static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio, int maxUpperHashBits) { - if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) { - return -1; - } + TAOS_CHECK_RETURN(taosLRUEntryTableInit(&shard->table, maxUpperHashBits)); taosThreadMutexInit(&shard->mutex, NULL); @@ -372,7 +371,7 @@ static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool st taosLRUCacheShardSetCapacity(shard, capacity); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { @@ -671,16 +670,13 @@ static int getDefaultCacheShardBits(size_t capacity) { SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) { if (numShardBits >= 20) { - terrno = TSDB_CODE_INVALID_PARA; return NULL; } if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) { - terrno = TSDB_CODE_INVALID_PARA; return NULL; } SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache)); if (!cache) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -692,14 +688,15 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); if (!cache->shards) { taosMemoryFree(cache); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } bool strictCapacity = 1; size_t perShard = (capacity + (numShards - 1)) / numShards; for (int i = 0; i < numShards; ++i) { - taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits); + if (TSDB_CODE_SUCCESS != + taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits)) + return NULL; } cache->numShards = numShards;