fix:error

This commit is contained in:
wangmm0220 2023-11-08 15:01:04 +08:00
parent 5ed7b8aab3
commit c4c1423243
3 changed files with 4 additions and 101 deletions

View File

@ -140,17 +140,12 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
//#define CHECKPOINT_PATH_LEN 128
//typedef struct SChekpointDataHeader{
// int64_t size;
// char name[CHECKPOINT_PATH_LEN];
// char id[CHECKPOINT_PATH_LEN];
//} SChekpointDataHeader;
typedef enum UPLOAD_TYPE{ typedef enum UPLOAD_TYPE{
UPLOAD_DISABLE = -1, UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0, UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1, UPLOAD_RSYNC = 1,
} UPLOAD_TYPE; } UPLOAD_TYPE;
UPLOAD_TYPE getUploadType(); UPLOAD_TYPE getUploadType();
int uploadCheckpoint(char* id, char* path); int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path);

View File

@ -375,98 +375,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code; return code;
} }
//static int64_t kBlockSize = 64 * 1024;
//static int sendCheckpointToS3(char* id, SArray* fileList){
// code = s3PutObjectFromFile2(from->fname, object_name);
// return 0;
//}
//static int sendCheckpointToSnode(char* id, SArray* fileList){
// if(strlen(id) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint id name too long, name:%s", id);
// return -1;
// }
// uint8_t* buf = taosMemoryCalloc(1, sizeof(SChekpointDataHeader) + kBlockSize);
// if(buf == NULL){
// tqError("uploadCheckpoint malloc failed");
// return -1;
// }
//
// SChekpointDataHeader* pHdr = (SChekpointDataHeader*)buf;
// strcpy(pHdr->id, id);
//
// TdFilePtr fd = NULL;
// for(int i = 0; i < taosArrayGetSize(fileList); i++){
// char* name = (char*)taosArrayGetP(fileList, i);
// if(strlen(name) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint file name too long, name:%s", name);
// return -1;
// }
// int64_t offset = 0;
//
// fd = taosOpenFile(name, TD_FILE_READ);
// tqDebug("uploadCheckpoint open file %s, file index: %d", name, i);
//
// while(1){
// int64_t nread = taosPReadFile(fd, buf + sizeof(SChekpointDataHeader), kBlockSize, offset);
// if (nread == -1) {
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// tqError("uploadCheckpoint failed to read file name:%s,reason:%d", name, errno);
// return -1;
// } else if (nread == 0){
// tqDebug("uploadCheckpoint no data read, close file:%s, move to next file, open and read", name);
// taosCloseFile(&fd);
// break;
// } else if (nread == kBlockSize){
// offset += nread;
// } else {
// taosCloseFile(&fd);
// offset = 0;
// }
// tqDebug("uploadCheckpoint read file %s, size:%" PRId64 ", current offset:%" PRId64, name, nread, offset);
//
//
// pHdr->size = nread;
// strcpy(pHdr->name, name);
//
// SRpcMsg rpcMsg = {0};
// int32_t bytes = sizeof(SChekpointDataHeader) + nread;
// rpcMsg.pCont = rpcMallocCont(bytes);
// rpcMsg.msgType = TDMT_SYNC_SNAPSHOT_SEND;
// rpcMsg.contLen = bytes;
// if (rpcMsg.pCont == NULL) {
// tqError("uploadCheckpoint malloc failed");
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// return -1;
// }
// memcpy(rpcMsg.pCont, buf, bytes);
// int try = 3;
// int32_t code = 0;
// while(try-- > 0){
// code = tmsgSendReq(pEpSet, &rpcMsg);
// if(code == 0)
// break;
// taosMsleep(10);
// }
// if(code != 0){
// tqError("uploadCheckpoint send request failed code:%d", code);
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// return -1;
// }
//
// if(offset == 0){
// break;
// }
// }
// }
//
// taosMemoryFree(buf);
//}
static int uploadCheckpointToS3(char* id, char* path){ static int uploadCheckpointToS3(char* id, char* path){
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1; if (pDir == NULL) return -1;

View File

@ -38,13 +38,13 @@ int main(int argc, char **argv) {
if (s3Init() < 0) { if (s3Init() < 0) {
return -1; return -1;
} }
// strcpy(tsSnodeIp, "127.0.0.1"); strcpy(tsSnodeAddress, "127.0.0.1");
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, checkpointUpload_Test) { TEST(testCase, checkpointUpload_Test) {
// stopRsync(); stopRsync();
// startRsync(); startRsync();
taosSsleep(5); taosSsleep(5);
char* id = "2013892036"; char* id = "2013892036";