Merge branch '3.0' into enh/refactorBackend
This commit is contained in:
commit
939aa7a3cc
|
@ -401,7 +401,7 @@ pipeline {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stage('linux test') {
|
stage('linux test') {
|
||||||
agent{label " slave1_50 || slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 "}
|
agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 "}
|
||||||
options { skipDefaultCheckout() }
|
options { skipDefaultCheckout() }
|
||||||
when {
|
when {
|
||||||
changeRequest()
|
changeRequest()
|
||||||
|
|
|
@ -60,8 +60,8 @@ if(${BUILD_S3})
|
||||||
find_library(S3_LIBRARY s3)
|
find_library(S3_LIBRARY s3)
|
||||||
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||||
find_library(XML2_LIBRARY xml2)
|
find_library(XML2_LIBRARY xml2)
|
||||||
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
|
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||||
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
|
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
common
|
common
|
||||||
|
|
||||||
|
|
|
@ -66,11 +66,12 @@ static int should_retry() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void s3PrintError(const char *func, S3Status status, char error_details[]) {
|
static void s3PrintError(const char *filename, int lineno, const char *funcname, S3Status status,
|
||||||
|
char error_details[]) {
|
||||||
if (status < S3StatusErrorAccessDenied) {
|
if (status < S3StatusErrorAccessDenied) {
|
||||||
uError("%s: %s", __func__, S3_get_status_name(status));
|
uError("%s/%s:%d-%s: %s", __func__, filename, lineno, funcname, S3_get_status_name(status));
|
||||||
} else {
|
} else {
|
||||||
uError("%s: %s, %s, %d", __func__, S3_get_status_name(status), error_details, status);
|
uError("%s/%s:%d-%s: %s, %s", __func__, filename, lineno, funcname, S3_get_status_name(status), error_details);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,9 +231,9 @@ typedef struct put_object_callback_data {
|
||||||
int noStatus;
|
int noStatus;
|
||||||
} put_object_callback_data;
|
} put_object_callback_data;
|
||||||
|
|
||||||
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
|
#define MULTIPART_CHUNK_SIZE (64 << 20) // multipart is 768M
|
||||||
|
|
||||||
typedef struct UploadManager {
|
typedef struct {
|
||||||
char err_msg[512];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
uint64_t content_length;
|
uint64_t content_length;
|
||||||
|
@ -307,6 +308,7 @@ static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackDat
|
||||||
S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
|
S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
|
||||||
UploadManager *manager = (UploadManager *)callbackData;
|
UploadManager *manager = (UploadManager *)callbackData;
|
||||||
manager->upload_id = strdup(upload_id);
|
manager->upload_id = strdup(upload_id);
|
||||||
|
manager->status = S3StatusOK;
|
||||||
return S3StatusOK;
|
return S3StatusOK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -437,7 +439,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
|
||||||
// printListMultipartHeader(data.allDetails);
|
// printListMultipartHeader(data.allDetails);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s3PrintError(__func__, data.status, data.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,7 +501,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
} while (S3_status_is_retryable(data.status) && should_retry());
|
} while (S3_status_is_retryable(data.status) && should_retry());
|
||||||
|
|
||||||
if (data.status != S3StatusOK) {
|
if (data.status != S3StatusOK) {
|
||||||
s3PrintError(__func__, data.status, data.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
} else if (data.contentLength) {
|
} else if (data.contentLength) {
|
||||||
uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
|
uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
|
||||||
|
@ -509,15 +511,15 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
} else {
|
} else {
|
||||||
uint64_t totalContentLength = contentLength;
|
uint64_t totalContentLength = contentLength;
|
||||||
uint64_t todoContentLength = contentLength;
|
uint64_t todoContentLength = contentLength;
|
||||||
UploadManager manager;
|
UploadManager manager = {0};
|
||||||
manager.upload_id = 0;
|
// manager.upload_id = 0;
|
||||||
manager.gb = 0;
|
// manager.gb = 0;
|
||||||
|
|
||||||
// div round up
|
// div round up
|
||||||
int seq;
|
int seq;
|
||||||
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 7;
|
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
|
||||||
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
|
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
|
||||||
const int max_part_num = 1000;
|
const int max_part_num = 10000;
|
||||||
if (totalSeq > max_part_num) {
|
if (totalSeq > max_part_num) {
|
||||||
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
|
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
|
||||||
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
|
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
|
||||||
|
@ -536,7 +538,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
S3MultipartCommitHandler commit_handler = {
|
S3MultipartCommitHandler commit_handler = {
|
||||||
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
|
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
|
||||||
|
|
||||||
manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq);
|
manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
|
||||||
manager.next_etags_pos = 0;
|
manager.next_etags_pos = 0;
|
||||||
/*
|
/*
|
||||||
if (uploadId) {
|
if (uploadId) {
|
||||||
|
@ -557,7 +559,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
} while (S3_status_is_retryable(manager.status) && should_retry());
|
} while (S3_status_is_retryable(manager.status) && should_retry());
|
||||||
|
|
||||||
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
||||||
s3PrintError(__func__, manager.status, manager.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
goto clean;
|
goto clean;
|
||||||
}
|
}
|
||||||
|
@ -582,7 +584,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
partContentLength, 0, timeoutMsG, &partData);
|
partContentLength, 0, timeoutMsG, &partData);
|
||||||
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
|
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
|
||||||
if (partData.put_object_data.status != S3StatusOK) {
|
if (partData.put_object_data.status != S3StatusOK) {
|
||||||
s3PrintError(__func__, partData.put_object_data.status, partData.put_object_data.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
goto clean;
|
goto clean;
|
||||||
}
|
}
|
||||||
|
@ -596,6 +598,10 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
char buf[256];
|
char buf[256];
|
||||||
int n;
|
int n;
|
||||||
for (i = 0; i < totalSeq; i++) {
|
for (i = 0; i < totalSeq; i++) {
|
||||||
|
if (!manager.etags[i]) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
|
goto clean;
|
||||||
|
}
|
||||||
n = snprintf(buf, sizeof(buf),
|
n = snprintf(buf, sizeof(buf),
|
||||||
"<Part><PartNumber>%d</PartNumber>"
|
"<Part><PartNumber>%d</PartNumber>"
|
||||||
"<ETag>%s</ETag></Part>",
|
"<ETag>%s</ETag></Part>",
|
||||||
|
@ -610,7 +616,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
timeoutMsG, &manager);
|
timeoutMsG, &manager);
|
||||||
} while (S3_status_is_retryable(manager.status) && should_retry());
|
} while (S3_status_is_retryable(manager.status) && should_retry());
|
||||||
if (manager.status != S3StatusOK) {
|
if (manager.status != S3StatusOK) {
|
||||||
s3PrintError(__func__, manager.status, manager.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
goto clean;
|
goto clean;
|
||||||
}
|
}
|
||||||
|
@ -723,7 +729,7 @@ static SArray *getListByPrefix(const char *prefix) {
|
||||||
return data.objectArray;
|
return data.objectArray;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s3PrintError(__func__, data.status, data.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
|
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
|
||||||
|
@ -742,7 +748,7 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
|
||||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||||
|
|
||||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||||
s3PrintError(__func__, cbd.status, cbd.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -885,7 +891,7 @@ long s3Size(const char *object_name) {
|
||||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||||
|
|
||||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||||
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
size = cbd.content_length;
|
size = cbd.content_length;
|
||||||
|
|
|
@ -156,6 +156,8 @@ static const SSysDbTableSchema streamSchema[] = {
|
||||||
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema streamTaskSchema[] = {
|
static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
|
|
|
@ -1461,7 +1461,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
SColumnInfoData *pColInfo;
|
||||||
SName n;
|
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
@ -1514,6 +1513,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
||||||
|
|
||||||
|
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
sinkQuota[0] = '0';
|
||||||
|
char dstStr[20] = {0};
|
||||||
|
STR_TO_VARSTR(dstStr, sinkQuota)
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
|
||||||
|
|
||||||
|
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
strcpy(scanHistoryIdle, "100a");
|
||||||
|
|
||||||
|
memset(dstStr, 0, tListLen(dstStr));
|
||||||
|
STR_TO_VARSTR(dstStr, scanHistoryIdle)
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -576,7 +576,10 @@ static int32_t tsdbMerge(void *arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (skipMerge) return 0;
|
if (skipMerge) {
|
||||||
|
code = 0;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
// do merge
|
// do merge
|
||||||
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
|
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
|
||||||
|
|
|
@ -294,7 +294,6 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
||||||
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
||||||
|
|
||||||
streamTaskClearCheckInfo(p);
|
streamTaskClearCheckInfo(p);
|
||||||
|
|
||||||
char* str = NULL;
|
char* str = NULL;
|
||||||
streamTaskGetStatus(p, &str);
|
streamTaskGetStatus(p, &str);
|
||||||
|
|
||||||
|
|
|
@ -650,7 +650,6 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
|
||||||
if (ref > 0) {
|
if (ref > 0) {
|
||||||
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
||||||
} else if (ref == 0) {
|
} else if (ref == 0) {
|
||||||
ASSERT(streamTaskShouldStop(pTask));
|
|
||||||
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
|
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
} else if (ref < 0) {
|
} else if (ref < 0) {
|
||||||
|
@ -1239,7 +1238,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
|
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId,
|
||||||
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
|
@ -217,7 +217,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||||
tdSql.checkEqual(208, len(tdSql.queryResult))
|
tdSql.checkEqual(210, len(tdSql.queryResult))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||||
tdSql.checkEqual(54, len(tdSql.queryResult))
|
tdSql.checkEqual(54, len(tdSql.queryResult))
|
||||||
|
|
Loading…
Reference in New Issue