diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 6b81ac87ee..ebeedcb5d2 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -136,7 +136,7 @@ void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); -int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId); +int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId, int64_t processver); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); @@ -144,7 +144,6 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId); void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy2(void* pBackend); -int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); @@ -249,7 +248,7 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); -int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); SBkdMgt* bkdMgtCreate(char* path); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c151193284..4915d4b122 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -140,7 +140,7 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest); int32_t valueToString(void* k, char* buf); int32_t valueIsStale(void* k, int64_t ts); -void destroyCompare(void* arg); +void destroyCompare(void* arg); static void cleanDir(const char* pPath, const char* id); static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); @@ -194,9 +194,7 @@ int32_t getCfIdx(const char* cfName) { return idx; } -bool isValidCheckpoint(const char* dir) { - return true; -} +bool isValidCheckpoint(const char* dir) { return true; } int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later @@ -486,9 +484,7 @@ _ERROR: return code; } -int32_t backendCopyFiles(const char* src, const char* dst) { - return backendFileCopyFilesImpl(src, dst); -} +int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); } static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, const char* defaultPath) { @@ -540,7 +536,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId char* chkptPath = taosMemoryCalloc(1, pathLen); if (chkptId > 0) { - snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); + snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", + chkptId); code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); if (code != 0) { @@ -549,11 +546,12 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId if (code != 0) { stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, - tstrerror(code), defaultPath); - code = 0; // reset the error code + tstrerror(code), defaultPath); + code = 0; // reset the error code } } else { // no valid checkpoint id - stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, defaultPath); + stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, + defaultPath); cleanDir(defaultPath, key); } @@ -1142,7 +1140,7 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { int64_t chkpId = pTaskDb->chkpId; taskDbRefChkp(pTaskDb, chkpId); - code = taskDbDoCheckpoint(pTaskDb, chkpId); + code = taskDbDoCheckpoint(pTaskDb, chkpId, 0); if (code != 0) { taskDbUnRefChkp(pTaskDb, chkpId); } @@ -1230,7 +1228,106 @@ int64_t taskGetDBRef(void* arg) { return pDb->refId; } -int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { +int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) { + TdFilePtr pFile = NULL; + int32_t code = -1; + + int32_t len = strlen(pChkpIdDir); + if (len == 0) { + terrno = TSDB_CODE_INVALID_PARA; + stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno)); + return -1; + } + + char* pDst = taosMemoryCalloc(1, len + 64); + if (pDst == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) <= 0) { + code = -1; + stError("failed to build dst to load extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + pFile = taosOpenFile(pDst, TD_FILE_READ); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to open file to load extra info, file:%s", pDst); + goto _EXIT; + } + + char buf[256] = {0}; + if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + code = -1; + goto _EXIT; + } + + if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) { + terrno = TSDB_CODE_INVALID_PARA; + stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + } + code = 0; +_EXIT: + taosMemoryFree(pDst); + taosCloseFile(&pFile); + return code; +} +int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { + TdFilePtr pFile = NULL; + int32_t code = -1; + + int32_t len = strlen(pChkpIdDir); + if (len == 0) { + terrno = TSDB_CODE_INVALID_PARA; + stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno)); + return -1; + } + + char* pDst = taosMemoryCalloc(1, len + 64); + if (pDst == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) < 0) { + stError("failed to build dst to add extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to open file to add extra info, file:%s", pDst); + goto _EXIT; + } + + char buf[256] = {0}; + int n = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); + if (n <= 0 || n >= sizeof(buf)) { + code = -1; + stError("failed to build content to add extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + if (taosWriteFile(pFile, buf, strlen(buf)) <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + goto _EXIT; + } + code = 0; + +_EXIT: + taosCloseFile(&pFile); + taosMemoryFree(pDst); + return code; +} +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) { STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); int32_t code = -1; @@ -1254,32 +1351,58 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { int64_t written = atomic_load_64(&pTaskDb->dataWritten); + // flush db if (written > 0) { stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf); + if (code != 0) goto _EXIT; } else { stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); } + + // do checkpoint if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); + goto _EXIT; } else { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, taosGetTimestampMs() - st); } + // add extra info to checkpoint + if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processId)) != 0) { + stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir); + goto _EXIT; + } + + // delete ttl checkpoint code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); + if (code < 0) { + goto _EXIT; + } + atomic_store_64(&pTaskDb->dataWritten, 0); pTaskDb->chkpId = chkpId; _EXIT: - taosMemoryFree(pChkpDir); + + // clear checkpoint dir if failed + if (code != 0 && pChkpDir != NULL) { + if (taosDirExist(pChkpIdDir)) { + taosRemoveDir(pChkpIdDir); + } + } taosMemoryFree(pChkpIdDir); + taosMemoryFree(pChkpDir); + taosReleaseRef(taskDbWrapperId, refId); taosMemoryFree(ppCf); return code; } -int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); } +int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer) { + return taskDbDoCheckpoint(arg, chkpId, processVer); +} SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)backend; @@ -2205,7 +2328,8 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char return code; } -int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) { +int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, + const char* idStr) { int32_t code = 0; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; @@ -2224,7 +2348,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } -int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) { +int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, + const char* idStr) { int32_t code = -1; STaskDbWrapper* pDb = arg; ECHECKPOINT_BACKUP_TYPE utype = type; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1fddb5a97d..af7e969c07 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -24,11 +24,13 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); -static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); +static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, + int32_t transId); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static void checkpointTriggerMonitorFn(void* param, void* tmrId); -static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); +static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, + int32_t transId); SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) { @@ -96,7 +98,7 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri if (pRsp->rspCode != TSDB_CODE_SUCCESS) { stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, - pRsp->upstreamTaskId, tstrerror(pRsp->rspCode)); + pRsp->upstreamTaskId, tstrerror(pRsp->rspCode)); return TSDB_CODE_SUCCESS; } @@ -108,7 +110,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - void* pBuf = rpcMallocCont(size); + void* pBuf = rpcMallocCont(size); SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); @@ -162,15 +164,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock taosThreadMutexLock(&pTask->lock); if (pTask->chkInfo.checkpointId > checkpointId) { stError("s-task:%s vgId:%d current checkpointId:%" PRId64 - " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", - id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); + " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", + id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_SUCCESS; } if (pTask->chkInfo.checkpointId == checkpointId) { { // send checkpoint-ready msg to upstream - SRpcMsg msg ={0}; + SRpcMsg msg = {0}; SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId); initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); @@ -362,7 +364,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId taosThreadMutexUnlock(&pInfo->lock); if (notReady == 0) { - stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", id); + stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", + id); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId); } @@ -371,11 +374,11 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - int64_t now = taosGetTimestampMs(); - int32_t numOfConfirmed = 0; + int64_t now = taosGetTimestampMs(); + int32_t numOfConfirmed = 0; taosThreadMutexLock(&pInfo->lock); - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { pReadyInfo->sendCompleted = 1; @@ -385,7 +388,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream } } - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo->sendCompleted == 1) { numOfConfirmed += 1; @@ -568,12 +571,12 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { - char* path = NULL; - int32_t code = 0; - SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); - int64_t now = taosGetTimestampMs(); - SStreamMeta* pMeta = pTask->pMeta; - const char* idStr = pTask->id.idStr; + char* path = NULL; + int32_t code = 0; + SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); + int64_t now = taosGetTimestampMs(); + SStreamMeta* pMeta = pTask->pMeta; + const char* idStr = pTask->id.idStr; if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, pTask->id.idStr)) != 0) { @@ -619,8 +622,8 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d idStr, checkpointId, el, path); taosRemoveDir(path); } else { - stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", - idStr, checkpointId, el); + stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr, + checkpointId, el); } taosMemoryFree(path); @@ -639,9 +642,10 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI } int64_t dbRefId = taskGetDBRef(pTask->pBackend); - void* pBackend = taskAcquireDb(dbRefId); + void* pBackend = taskAcquireDb(dbRefId); if (pBackend == NULL) { - stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr); + stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", + pTask->id.idStr); return -1; } @@ -663,7 +667,8 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); - code = streamBackendDoCheckpoint(pTask->pBackend, ckId); + int64_t ver = 0; + code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); } @@ -773,11 +778,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); bool recved = false; - for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { + for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); if (pInfo->nodeId == pReady->upstreamNodeId) { recved = true; @@ -785,7 +790,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } } - if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message + if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message streamTaskOpenUpstreamInput(pTask, pInfo->taskId); taosArrayPush(pNotSendList, pInfo); } @@ -870,7 +875,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) return false; } - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo->nodeId != downstreamNodeId) { continue; @@ -939,10 +944,10 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { int32_t num = 0; taosThreadMutexLock(&pInfo->lock); - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p->recved) { - num ++; + num++; } } taosThreadMutexUnlock(&pInfo->lock); diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 2fb257fe4e..38d48a2a32 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -29,7 +29,7 @@ class BackendEnv : public ::testing::Test { void *backendCreate() { const char *streamPath = "/tmp"; - void * p = NULL; + void *p = NULL; // char *absPath = NULL; // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); @@ -52,7 +52,7 @@ SStreamState *stateCreate(const char *path) { } void *backendOpen() { streamMetaInit(); - const char * path = "/tmp/backend"; + const char *path = "/tmp/backend"; SStreamState *p = stateCreate(path); ASSERT(p != NULL); @@ -79,7 +79,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; - char * newVal = NULL; + char *newVal = NULL; streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(len == strlen(val)); } @@ -100,7 +100,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; - char * newVal = NULL; + char *newVal = NULL; int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(code != 0); } @@ -130,7 +130,7 @@ void *backendOpen() { winkey.groupId = 0; winkey.ts = tsArray[0]; - char * val = NULL; + char *val = NULL; int32_t len = 0; pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); @@ -157,7 +157,7 @@ void *backendOpen() { key.ts = tsArray[i]; key.exprIdx = i; - char * val = NULL; + char *val = NULL; int32_t len = 0; streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); ASSERT(len == strlen("Value")); @@ -168,7 +168,7 @@ void *backendOpen() { key.ts = tsArray[i]; key.exprIdx = i; - char * val = NULL; + char *val = NULL; int32_t len = 0; streamStateFuncDel_rocksdb(p, &key); } @@ -213,7 +213,7 @@ void *backendOpen() { { SSessionKey key; memset(&key, 0, sizeof(key)); - char * val = NULL; + char *val = NULL; int32_t vlen = 0; code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); @@ -260,7 +260,7 @@ void *backendOpen() { SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; key.groupId = (uint64_t)(i); key.ts = tsArray[i]; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); taosMemoryFreeClear(val); @@ -272,7 +272,7 @@ void *backendOpen() { SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); ASSERT(pCurr != NULL); - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); ASSERT(vlen == strlen("Value")); @@ -296,7 +296,7 @@ void *backendOpen() { SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; key.groupId = (uint64_t)(i); key.ts = tsArray[i]; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); taosMemoryFreeClear(val); @@ -338,7 +338,7 @@ void *backendOpen() { char key[128] = {0}; sprintf(key, "tbname_%d", i); - char * val = NULL; + char *val = NULL; int32_t len = 0; code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); ASSERT(code == 0); @@ -354,7 +354,7 @@ TEST_F(BackendEnv, checkOpen) { SStreamState *p = (SStreamState *)backendOpen(); int64_t tsStart = taosGetTimestampMs(); { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; for (int i = 0; i < size; i++) { char key[128] = {0}; @@ -368,7 +368,7 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -383,9 +383,9 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } // do checkpoint 2 - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0); { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -400,17 +400,17 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3); + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0); const char *path = "/tmp/backend/stream"; const char *dump = "/tmp/backend/stream/dump"; // taosMkDir(dump); taosMulMkDir(dump); SBkdMgt *mgt = bkdMgtCreate((char *)path); - SArray * result = taosArrayInit(4, sizeof(void *)); + SArray *result = taosArrayInit(4, sizeof(void *)); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4); + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); taosArrayClear(result); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);