diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index f1904850d7..4efba478f3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -140,17 +140,12 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); -//#define CHECKPOINT_PATH_LEN 128 -//typedef struct SChekpointDataHeader{ -// int64_t size; -// char name[CHECKPOINT_PATH_LEN]; -// char id[CHECKPOINT_PATH_LEN]; -//} SChekpointDataHeader; typedef enum UPLOAD_TYPE{ UPLOAD_DISABLE = -1, UPLOAD_S3 = 0, UPLOAD_RSYNC = 1, } UPLOAD_TYPE; + UPLOAD_TYPE getUploadType(); int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3cfdd65a77..5479a2dab2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -375,98 +375,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } - -//static int64_t kBlockSize = 64 * 1024; -//static int sendCheckpointToS3(char* id, SArray* fileList){ -// code = s3PutObjectFromFile2(from->fname, object_name); -// return 0; -//} -//static int sendCheckpointToSnode(char* id, SArray* fileList){ -// if(strlen(id) >= CHECKPOINT_PATH_LEN){ -// tqError("uploadCheckpoint id name too long, name:%s", id); -// return -1; -// } -// uint8_t* buf = taosMemoryCalloc(1, sizeof(SChekpointDataHeader) + kBlockSize); -// if(buf == NULL){ -// tqError("uploadCheckpoint malloc failed"); -// return -1; -// } -// -// SChekpointDataHeader* pHdr = (SChekpointDataHeader*)buf; -// strcpy(pHdr->id, id); -// -// TdFilePtr fd = NULL; -// for(int i = 0; i < taosArrayGetSize(fileList); i++){ -// char* name = (char*)taosArrayGetP(fileList, i); -// if(strlen(name) >= CHECKPOINT_PATH_LEN){ -// tqError("uploadCheckpoint file name too long, name:%s", name); -// return -1; -// } -// int64_t offset = 0; -// -// fd = taosOpenFile(name, TD_FILE_READ); -// tqDebug("uploadCheckpoint open file %s, file index: %d", name, i); -// -// while(1){ -// int64_t nread = taosPReadFile(fd, buf + sizeof(SChekpointDataHeader), kBlockSize, offset); -// if (nread == -1) { -// taosCloseFile(&fd); -// taosMemoryFree(buf); -// tqError("uploadCheckpoint failed to read file name:%s,reason:%d", name, errno); -// return -1; -// } else if (nread == 0){ -// tqDebug("uploadCheckpoint no data read, close file:%s, move to next file, open and read", name); -// taosCloseFile(&fd); -// break; -// } else if (nread == kBlockSize){ -// offset += nread; -// } else { -// taosCloseFile(&fd); -// offset = 0; -// } -// tqDebug("uploadCheckpoint read file %s, size:%" PRId64 ", current offset:%" PRId64, name, nread, offset); -// -// -// pHdr->size = nread; -// strcpy(pHdr->name, name); -// -// SRpcMsg rpcMsg = {0}; -// int32_t bytes = sizeof(SChekpointDataHeader) + nread; -// rpcMsg.pCont = rpcMallocCont(bytes); -// rpcMsg.msgType = TDMT_SYNC_SNAPSHOT_SEND; -// rpcMsg.contLen = bytes; -// if (rpcMsg.pCont == NULL) { -// tqError("uploadCheckpoint malloc failed"); -// taosCloseFile(&fd); -// taosMemoryFree(buf); -// return -1; -// } -// memcpy(rpcMsg.pCont, buf, bytes); -// int try = 3; -// int32_t code = 0; -// while(try-- > 0){ -// code = tmsgSendReq(pEpSet, &rpcMsg); -// if(code == 0) -// break; -// taosMsleep(10); -// } -// if(code != 0){ -// tqError("uploadCheckpoint send request failed code:%d", code); -// taosCloseFile(&fd); -// taosMemoryFree(buf); -// return -1; -// } -// -// if(offset == 0){ -// break; -// } -// } -// } -// -// taosMemoryFree(buf); - -//} - static int uploadCheckpointToS3(char* id, char* path){ TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 135431cfb7..dca2e97c28 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -38,13 +38,13 @@ int main(int argc, char **argv) { if (s3Init() < 0) { return -1; } -// strcpy(tsSnodeIp, "127.0.0.1"); + strcpy(tsSnodeAddress, "127.0.0.1"); return RUN_ALL_TESTS(); } TEST(testCase, checkpointUpload_Test) { -// stopRsync(); -// startRsync(); + stopRsync(); + startRsync(); taosSsleep(5); char* id = "2013892036";