diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7271247ef2..bcfeb77ff9 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,6 +20,28 @@ #include "tcommon.h" #include "tref.h" +typedef struct SDbChkp { + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + + char* buf; + int32_t len; + + // ping-pong buf + SHashObj* pSstTbl[2]; + int8_t idx; + + SArray* pAdd; + SArray* pDel; + int8_t update; + + TdThreadRwlock rwLock; +} SDbChkp; typedef struct { int8_t init; char* pCurrent; @@ -39,6 +61,10 @@ typedef struct { SArray* pAdd; SArray* pDel; int8_t update; + + SHashObj *pDbChkpTbl; + + TdThreadRwlock rwLock; } SBackendManager; typedef struct SCompactFilteFactory { @@ -145,42 +171,23 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); -SBackendManager* bkdMgtCreate(char* path) { - SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); - p->curChkpId = 0; - p->preCkptId = 0; - p->pSST = taosArrayInit(64, sizeof(void*)); - p->path = taosStrdup(path); - p->len = strlen(path) + 128; - p->buf = taosMemoryCalloc(1, p->len); - p->idx = 0; - p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); +void dbChkpDestroy(SDbChkp* pChkp) { + taosMemoryFree(pChkp->buf); + taosMemoryFree(pChkp->path); - p->pAdd = taosArrayInit(64, sizeof(void*)); - p->pDel = taosArrayInit(64, sizeof(void*)); - p->update = 0; - return p; + taosArrayDestroyP(pChkp->pSST, taosMemoryFree); + taosArrayDestroyP(pChkp->pAdd, taosMemoryFree); + taosArrayDestroyP(pChkp->pDel, taosMemoryFree); + + taosHashCleanup(pChkp->pSstTbl[0]); + taosHashCleanup(pChkp->pSstTbl[1]); + + taosMemoryFree(pChkp->pCurrent); + taosMemoryFree(pChkp->pManifest); + } -void bkdMgtDestroy(SBackendManager* bm) { - if (bm == NULL) return; - taosMemoryFree(bm->buf); - taosMemoryFree(bm->path); - - taosArrayDestroyP(bm->pSST, taosMemoryFree); - taosArrayDestroyP(bm->pAdd, taosMemoryFree); - taosArrayDestroyP(bm->pDel, taosMemoryFree); - - taosHashCleanup(bm->pSstTbl[0]); - taosHashCleanup(bm->pSstTbl[1]); - - taosMemoryFree(bm->pCurrent); - taosMemoryFree(bm->pManifest); - - taosMemoryFree(bm); -} int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { int32_t code = 0; @@ -204,7 +211,12 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { return code; } -int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { + +int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) { + taosThreadRwlockWrlock(&p->rwLock); + + p->preCkptId = p->curChkpId; + p->curChkpId = chkpId; const char* pCurrent = "CURRENT"; int32_t currLen = strlen(pCurrent); @@ -214,81 +226,112 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { const char* pSST = ".sst"; int32_t sstLen = strlen(pSST); - memset(bm->buf, 0, bm->len); - sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); + memset(p->buf, 0, p->len); + sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId); - taosArrayClearP(bm->pAdd, taosMemoryFree); - taosArrayClearP(bm->pDel, taosMemoryFree); + taosArrayClearP(p->pAdd, taosMemoryFree); + taosArrayClearP(p->pDel, taosMemoryFree); - TdDirPtr pDir = taosOpenDir(bm->buf); + TdDirPtr pDir = taosOpenDir(p->buf); TdDirEntryPtr de = NULL; int8_t dummy = 0; while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { - taosMemoryFreeClear(bm->pCurrent); - bm->pCurrent = taosStrdup(name); - taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); + taosMemoryFreeClear(p->pCurrent); + p->pCurrent = taosStrdup(name); + taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { - taosMemoryFreeClear(bm->pManifest); - bm->pManifest = taosStrdup(name); - taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); + taosMemoryFreeClear(p->pManifest); + p->pManifest = taosStrdup(name); + taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { // char* p = taosStrdup(name); - taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } } - if (bm->init == 0) { - bm->preCkptId = -1; - bm->curChkpId = chkpId; - bm->init = 1; + if (p->init == 0) { + p->preCkptId = -1; + p->curChkpId = chkpId; + p->init = 1; - void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL); + void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); while (pIter) { size_t len; char* name = taosHashGetKey(pIter, &len); if (name != NULL && len != 0) { - taosArrayPush(bm->pAdd, &name); + taosArrayPush(p->pAdd, &name); } - pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); + pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); } - if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1; + if (taosArrayGetSize(p->pAdd) > 0) p->update = 1; } else { - int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); + int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel); if (code != 0) { // dead code - taosArrayClearP(bm->pAdd, taosMemoryFree); - taosArrayClearP(bm->pDel, taosMemoryFree); - taosHashClear(bm->pSstTbl[1 - bm->idx]); - bm->update = 0; + taosArrayClearP(p->pAdd, taosMemoryFree); + taosArrayClearP(p->pDel, taosMemoryFree); + taosHashClear(p->pSstTbl[1 - p->idx]); + p->update = 0; taosCloseDir(&pDir); return code; } - bm->preCkptId = bm->curChkpId; - bm->curChkpId = chkpId; - if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) { - bm->update = 0; + p->preCkptId = p->curChkpId; + p->curChkpId = chkpId; + if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) { + p->update = 0; } } - taosHashClear(bm->pSstTbl[bm->idx]); - bm->idx = 1 - bm->idx; + taosHashClear(p->pSstTbl[p->idx]); + p->idx = 1 - p->idx; taosCloseDir(&pDir); + taosThreadRwlockUnlock(&p->rwLock); return 0; + +} +SDbChkp* dbChktCreate(char* path, int64_t initChkpId) { + SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp)); + p->curChkpId = initChkpId; + p->preCkptId = -1; + p->pSST = taosArrayInit(64, sizeof(void*)); + p->path = taosStrdup(path); + p->len = strlen(path) + 128; + p->buf = taosMemoryCalloc(1, p->len); + + p->idx = 0; + p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + + p->pAdd = taosArrayInit(64, sizeof(void*)); + p->pDel = taosArrayInit(64, sizeof(void*)); + p->update = 0; + taosThreadRwlockInit(&p->rwLock, NULL); + + SArray *list = NULL; + int32_t code = dbChkpGetDelta(p, initChkpId, list); + + return p; } -int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { +int32_t dbChkpInit(SDbChkp* p) { + if (p == NULL) return 0; + return 0; +} +int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { + + taosThreadRwlockRdlock(&p->rwLock); int32_t code = 0; - int32_t len = bm->len + 128; + int32_t len = p->len + 128; char* srcBuf = taosMemoryCalloc(1, len); char* dstBuf = taosMemoryCalloc(1, len); @@ -296,8 +339,8 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { char* srcDir = taosMemoryCalloc(1, len); char* dstDir = taosMemoryCalloc(1, len); - sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId); - sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname); + sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId); + sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname); if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); @@ -314,30 +357,30 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { // clear current file memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); taosRemoveFile(dstBuf); memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); taosRemoveFile(dstBuf); // add file to $name dir - for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) { + for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { memset(dstBuf, 0, len); memset(srcBuf, 0, len); - char* filename = taosArrayGetP(bm->pAdd, i); + char* filename = taosArrayGetP(p->pAdd, i); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); taosCopyFile(srcBuf, dstBuf); } // del file in $name - for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { + for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { memset(dstBuf, 0, len); memset(srcBuf, 0, len); - char* filename = taosArrayGetP(bm->pDel, i); + char* filename = taosArrayGetP(p->pDel, i); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); taosRemoveFile(dstBuf); } @@ -345,27 +388,92 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { // copy current file to dst dir memset(srcBuf, 0, len); memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pCurrent); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); taosCopyFile(srcBuf, dstBuf); // copy manifest file to dst dir memset(srcBuf, 0, len); memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pManifest); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); taosCopyFile(srcBuf, dstBuf); // clear delta data buf - taosArrayClearP(bm->pAdd, taosMemoryFree); - taosArrayClearP(bm->pDel, taosMemoryFree); + taosArrayClearP(p->pAdd, taosMemoryFree); + taosArrayClearP(p->pDel, taosMemoryFree); _ERROR: + taosThreadRwlockUnlock(&p->rwLock); taosMemoryFree(srcBuf); taosMemoryFree(dstBuf); taosMemoryFree(srcDir); taosMemoryFree(dstDir); return code; + +} +SBackendManager* bkdMgtCreate(char* path) { + SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); + p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + taosThreadRwlockInit(&p->rwLock, NULL); + return p; +} + +void bkdMgtDestroy(SBackendManager* bm) { + if (bm == NULL) return; + void *pIter = taosHashIterate(bm->pDbChkpTbl, NULL); + while (pIter) { + SDbChkp *pChkp = *(SDbChkp **)(pIter); + dbChkpDestroy(pChkp); + + pIter = taosHashIterate(bm->pDbChkpTbl, pIter); + } + + taosThreadRwlockDestroy(&bm->rwLock); + + taosMemoryFree(bm); +} +int32_t bkdMgtGetDelta(SBackendManager* bm, char *taskId, int64_t chkpId, SArray* list) { + int32_t code = 0; + taosThreadRwlockWrlock(&bm->rwLock); + + SDbChkp *pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + code = dbChkpGetDelta(pChkp, chkpId, list); + + taosThreadRwlockUnlock(&bm->rwLock); + return code ; +} + +int32_t bkdMgtAddChkp(SBackendManager *bm, char *task, char *path) { + int32_t code = -1; + + taosThreadRwlockWrlock(&bm->rwLock); + SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); + if (pp == NULL) { + SDbChkp *p = dbChktCreate(path, 0); + if (p != NULL) { + taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *)); + code = 0; + } + } else { + stError("task chkp already exists"); + } + + taosThreadRwlockUnlock(&bm->rwLock); + + return code; +} + +int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) { + int32_t code = 0; + taosThreadRwlockRdlock(&bm->rwLock); + + SDbChkp *p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + code = dbChkpDumpTo(p, dname); + + taosThreadRwlockUnlock(&bm->rwLock); + return code; + } SCfInit ginitDict[] = {