diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c704472dff..95bce32196 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -346,6 +346,7 @@ struct STsdbFS { typedef struct { rocksdb_t *db; rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 69eacfa46e..94e5f253bf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -178,6 +178,7 @@ int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); +int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); @@ -194,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); + int32_t type); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f8077d08eb..17aed62241 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -98,12 +98,19 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { goto _err3; } + rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); + if (NULL == flushoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; + } + rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; + pTsdb->rCache.flushoptions = flushoptions; pTsdb->rCache.db = db; taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); @@ -122,6 +129,7 @@ _err: static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); + rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); @@ -129,6 +137,20 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->rCache.rMutex); } +int32_t tsdbCacheCommit(STsdb *pTsdb) { + int32_t code = 0; + char *err = NULL; + + rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + code = -1; + } + + return code; +} + SLastCol *tsdbCacheDeserialize(char const *value) { if (!value) { return NULL; @@ -365,6 +387,21 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } // maybe store it back to rocks cache + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + taosMemoryFree(value); } taosArrayPush(pLastArray, pLastCol); @@ -2256,7 +2293,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC if (lastRowTs == TSKEY_MAX) { lastRowTs = rowTs; - for (int16_t iCol = 0; iCol < nCols; ++iCol) { + for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { if (iCol >= nLastCol) { break; } @@ -2264,6 +2301,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { continue; } + if (slotIds[iCol] == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs}); + taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}); + continue; + } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); *pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal}; @@ -2336,10 +2380,6 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC } } while (setNoneCol); - // if (taosArrayGetSize(pColArray) <= 0) { - //*ppLastArray = NULL; - // taosArrayDestroy(pColArray); - //} else { if (!hasRow) { if (ignoreEarlierTs) { taosArrayDestroy(pColArray); @@ -2349,11 +2389,10 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC } } *ppLastArray = pColArray; - //} nextRowIterClose(&iter); taosArrayDestroy(aColArray); - // taosMemoryFreeClear(pTSchema); + return code; _err: diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 847125018c..74168591d2 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -144,8 +144,8 @@ _exit: } int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - bool diskAvail = osDataSpaceAvailable(); - bool needCommit = false; + bool diskAvail = osDataSpaceAvailable(); + bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { @@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbCacheCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo); TSDB_CHECK_CODE(code, lino, _exit);