From 4104840ceaa909015131ef1ed7e224e77e42c931 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 13 Nov 2023 18:07:38 +0800 Subject: [PATCH] Merge branch '3.0' into enh/refactorBackend --- source/common/src/cos.c | 2 +- source/common/src/tglobal.c | 4 +- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 57 ++++++++++++------- source/libs/stream/src/streamCheckpoint.c | 2 +- 5 files changed, 43 insertions(+), 23 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 509bd31512..09b2b8e4db 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -467,7 +467,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { // data.infileFD = NULL; // data.noStatus = noStatus; - uError("ERROR: %s stat file %s: ", __func__, file); + // uError("ERROR: %s stat file %s: ", __func__, file); if (taosStatFile(file, &contentLength, NULL, NULL) < 0) { uError("ERROR: %s Failed to stat file %s: ", __func__, file); code = TAOS_SYSTEM_ERROR(errno); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a1566c23a5..8fe93ae201 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -247,7 +247,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 30; +int32_t tsStreamCheckpointInterval = 10; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 15; int32_t tsTtlUnit = 86400; @@ -721,7 +721,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 20, 1200, CFG_SCOPE_SERVER, + if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 1, 1200, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a101b6a09d..92ec1899db 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -77,6 +77,7 @@ typedef struct { SArray* chkpInUse; int32_t chkpCap; TdThreadRwlock chkpDirLock; + int64_t dataWritten; } STaskDbWrapper; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 3740d8fe56..0d18b6a900 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -891,6 +891,7 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { } code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); + pTaskDb->dataWritten = 0; pTaskDb->chkpId = chkpId; @@ -2161,7 +2162,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ + wrapper->dataWritten += 1; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ @@ -2237,7 +2239,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ + wrapper->dataWritten += 1; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ @@ -2277,6 +2280,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { stDebug("streamStateClear_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + wrapper->dataWritten += 1; char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; @@ -3253,6 +3257,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + wrapper->dataWritten += 1; int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { @@ -3285,7 +3290,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); - STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + wrapper->dataWritten += 1; + rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); @@ -3303,6 +3310,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { char* err = NULL; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + wrapper->dataWritten += 1; rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { stError("streamState failed to write batch, err:%s", err); @@ -3401,6 +3409,26 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { return code; } +void hashTableToDebug(SHashObj* pTbl) { + size_t sz = taosHashGetSize(pTbl); + int32_t total = 0; + char* buf = taosMemoryCalloc(1, sz * 16); + void* pIter = taosHashIterate(pTbl, NULL); + while (pIter) { + size_t len = 0; + char* name = taosHashGetKey(pIter, &len); + char* tname = taosMemoryCalloc(1, len + 1); + memcpy(tname, name, len); + total += sprintf(buf + total, "%s,", tname); + + pIter = taosHashIterate(pTbl, pIter); + taosMemoryFree(tname); + } + buf[total - 1] = 0; + + stTrace("curr file list:[%s]", buf); + taosMemoryFree(buf); +} int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosThreadRwlockWrlock(&p->rwLock); @@ -3420,6 +3448,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosArrayClearP(p->pAdd, taosMemoryFree); taosArrayClearP(p->pDel, taosMemoryFree); + taosHashClear(p->pSstTbl[1 - p->idx]); TdDirPtr pDir = taosOpenDir(p->buf); TdDirEntryPtr de = NULL; @@ -3445,17 +3474,13 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { continue; } } + taosCloseDir(&pDir); - void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); - while (pIter) { - size_t len = 0; - char* name = taosHashGetKey(pIter, &len); + stTrace("chkp get file list: 1-1"); + hashTableToDebug(p->pSstTbl[1 - p->idx]); - char* buf = taosMemoryCalloc(1, len + 1); - strncpy(buf, name, len); - stError("curr file list: %s", buf); - pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); - } + stTrace("chkp get file list: 1-2"); + hashTableToDebug(p->pSstTbl[p->idx]); if (p->init == 0) { void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); @@ -3482,7 +3507,6 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosArrayClearP(p->pDel, taosMemoryFree); taosHashClear(p->pSstTbl[1 - p->idx]); p->update = 0; - taosCloseDir(&pDir); return code; } @@ -3493,11 +3517,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { p->preCkptId = p->curChkpId; p->curChkpId = chkpId; } - - taosHashClear(p->pSstTbl[p->idx]); p->idx = 1 - p->idx; - taosCloseDir(&pDir); taosThreadRwlockUnlock(&p->rwLock); return 0; @@ -3664,7 +3685,7 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, taosThreadRwlockWrlock(&bm->rwLock); SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); - SDbChkp* pChkp = NULL; + SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL; if (pChkp == NULL) { char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64); @@ -3679,8 +3700,6 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, code = dbChkpDumpTo(pChkp, dname); taosThreadRwlockUnlock(&bm->rwLock); return code; - } else { - pChkp = *ppChkp; } code = dbChkpGetDelta(pChkp, chkpId, list); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 946656c7d0..f7ac9e61bc 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -457,7 +457,7 @@ static int uploadCheckpointToS3(char* id, char* path) { return -1; } stDebug("[s3] upload checkpoint:%s", filename); - break; + // break; } taosCloseDir(&pDir);