From 60ad82d006188f32f8702620108b1ef39e13f8cf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 17 Nov 2023 09:38:48 +0800 Subject: [PATCH 01/11] fix(cos/log): dump file name & line no --- source/common/CMakeLists.txt | 4 ++-- source/common/src/cos.c | 21 +++++++++++---------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index c7831b2b6d..d3df1345df 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -60,8 +60,8 @@ if(${BUILD_S3}) find_library(S3_LIBRARY s3) find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) find_library(XML2_LIBRARY xml2) - find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) - find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) + find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) target_link_libraries( common diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 4cfc67fa94..fd462d5a11 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -66,11 +66,12 @@ static int should_retry() { return 0; } -static void s3PrintError(const char *func, S3Status status, char error_details[]) { +static void s3PrintError(const char *filename, int lineno, const char *funcname, S3Status status, + char error_details[]) { if (status < S3StatusErrorAccessDenied) { - uError("%s: %s", __func__, S3_get_status_name(status)); + uError("%s/%s:%d-%s: %s", filename, lineno, __func__, funcname, S3_get_status_name(status)); } else { - uError("%s: %s, %s", __func__, S3_get_status_name(status), error_details); + uError("%s/%s:%d-%s: %s, %s", filename, lineno, __func__, funcname, S3_get_status_name(status), error_details); } } @@ -437,7 +438,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan // printListMultipartHeader(data.allDetails); } } else { - s3PrintError(__func__, data.status, data.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg); return -1; } @@ -498,7 +499,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } while (S3_status_is_retryable(data.status) && should_retry()); if (data.status != S3StatusOK) { - s3PrintError(__func__, data.status, data.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg); code = TAOS_SYSTEM_ERROR(EIO); } else if (data.contentLength) { uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, @@ -556,7 +557,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } while (S3_status_is_retryable(manager.status) && should_retry()); if (manager.upload_id == 0 || manager.status != S3StatusOK) { - s3PrintError(__func__, manager.status, manager.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); code = TAOS_SYSTEM_ERROR(EIO); goto clean; } @@ -581,7 +582,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { partContentLength, 0, timeoutMsG, &partData); } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); if (partData.put_object_data.status != S3StatusOK) { - s3PrintError(__func__, partData.put_object_data.status, partData.put_object_data.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); code = TAOS_SYSTEM_ERROR(EIO); goto clean; } @@ -609,7 +610,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { timeoutMsG, &manager); } while (S3_status_is_retryable(manager.status) && should_retry()); if (manager.status != S3StatusOK) { - s3PrintError(__func__, manager.status, manager.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); code = TAOS_SYSTEM_ERROR(EIO); goto clean; } @@ -722,7 +723,7 @@ static SArray *getListByPrefix(const char *prefix) { return data.objectArray; } } else { - s3PrintError(__func__, data.status, data.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg); } taosArrayDestroyEx(data.objectArray, s3FreeObjectKey); @@ -741,7 +742,7 @@ void s3DeleteObjects(const char *object_name[], int nobject) { } while (S3_status_is_retryable(cbd.status) && should_retry()); if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { - s3PrintError(__func__, cbd.status, cbd.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg); } } } From 9d3ffa2c2cc9104599d18653f54f00a2de031609 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 17 Nov 2023 12:24:51 +0800 Subject: [PATCH 02/11] cos/print-error: fix error format --- source/common/src/cos.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index fd462d5a11..684f437d05 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -69,9 +69,9 @@ static int should_retry() { static void s3PrintError(const char *filename, int lineno, const char *funcname, S3Status status, char error_details[]) { if (status < S3StatusErrorAccessDenied) { - uError("%s/%s:%d-%s: %s", filename, lineno, __func__, funcname, S3_get_status_name(status)); + uError("%s/%s:%d-%s: %s", __func__, filename, lineno, funcname, S3_get_status_name(status)); } else { - uError("%s/%s:%d-%s: %s, %s", filename, lineno, __func__, funcname, S3_get_status_name(status), error_details); + uError("%s/%s:%d-%s: %s, %s", __func__, filename, lineno, funcname, S3_get_status_name(status), error_details); } } @@ -885,7 +885,7 @@ long s3Size(const char *object_name) { } while (S3_status_is_retryable(cbd.status) && should_retry()); if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { - uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg); } size = cbd.content_length; From 8038fbaa047229f5472997021ae91645f27f90e1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 17 Nov 2023 14:58:03 +0800 Subject: [PATCH 03/11] cos/multi: clear manager to init --- source/common/src/cos.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 684f437d05..aecba57e0a 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -233,7 +233,7 @@ typedef struct put_object_callback_data { #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M -typedef struct UploadManager { +typedef struct { char err_msg[512]; S3Status status; uint64_t content_length; @@ -308,6 +308,7 @@ static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackDat S3Status initial_multipart_callback(const char *upload_id, void *callbackData) { UploadManager *manager = (UploadManager *)callbackData; manager->upload_id = strdup(upload_id); + manager->status = S3StatusOK; return S3StatusOK; } @@ -509,15 +510,15 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } else { uint64_t totalContentLength = contentLength; uint64_t todoContentLength = contentLength; - UploadManager manager; - manager.upload_id = 0; - manager.gb = 0; + UploadManager manager = {0}; + // manager.upload_id = 0; + // manager.gb = 0; // div round up int seq; uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 7; int totalSeq = (contentLength + chunk_size - 1) / chunk_size; - const int max_part_num = 1000; + const int max_part_num = 10000; if (totalSeq > max_part_num) { chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; totalSeq = (contentLength + chunk_size - 1) / chunk_size; From 56badd0db07891b4aeb9b423068dced0252c8dbf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 17 Nov 2023 15:12:40 +0800 Subject: [PATCH 04/11] cos/chunk-size: reduce to 64m from 768m --- source/common/src/cos.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index aecba57e0a..b4e654c67a 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -231,7 +231,7 @@ typedef struct put_object_callback_data { int noStatus; } put_object_callback_data; -#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M +#define MULTIPART_CHUNK_SIZE (64 << 20) // multipart is 768M typedef struct { char err_msg[512]; @@ -516,7 +516,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { // div round up int seq; - uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 7; + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; int totalSeq = (contentLength + chunk_size - 1) / chunk_size; const int max_part_num = 10000; if (totalSeq > max_part_num) { From b12e5050eae8916cfcd5e40dbec3a665b2fb0a0e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Nov 2023 15:57:46 +0800 Subject: [PATCH 05/11] fix(stream): remove invalid assert --- source/libs/stream/src/streamMeta.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 228da65021..ae8c92d48e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -487,7 +487,6 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) if (ref > 0) { stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { - ASSERT(streamTaskShouldStop(pTask)); stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); tFreeStreamTask(pTask); } else if (ref < 0) { @@ -1071,7 +1070,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; - stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, + stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); streamMetaWLock(pMeta); From abcd36b76de5f47da88a69947a8ea30d1365bb87 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 17 Nov 2023 16:26:22 +0800 Subject: [PATCH 06/11] jenkins: remove 50 --- Jenkinsfile2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 49d5f8884b..8ddee6dbbd 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -401,7 +401,7 @@ pipeline { } } stage('linux test') { - agent{label " slave1_50 || slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 "} + agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 "} options { skipDefaultCheckout() } when { changeRequest() From ec9c2bde696d8d9e965a5047c8b432bc0ad9bafa Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 20 Nov 2023 11:38:48 +0800 Subject: [PATCH 07/11] fix(cos/etags): initialize etags to NULL --- source/common/src/cos.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index b4e654c67a..1dca5eda9b 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -537,7 +537,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { S3MultipartCommitHandler commit_handler = { {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; - manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq); + manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); manager.next_etags_pos = 0; /* if (uploadId) { @@ -597,6 +597,10 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { char buf[256]; int n; for (i = 0; i < totalSeq; i++) { + if (!manager.etags[i]) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } n = snprintf(buf, sizeof(buf), "%d" "%s", From 3912e49309ea99f253f2901886e97d981833e5f6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:46:11 +0800 Subject: [PATCH 08/11] enh(stream): add sink_quota column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5f44d3e7fc..43ba3d2698 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -156,6 +156,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 766e01d955..b3a2a9fa6e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1481,7 +1481,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB if (pShow->pIter == NULL) break; SColumnInfoData *pColInfo; - SName n; int32_t cols = 0; char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; From 7cfa45e95e5c73ec1363f530fa600156a71b67d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:59:49 +0800 Subject: [PATCH 09/11] enh(stream): add sink_quota/scan-history-idle-duration column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 15 +++++++++++++++ source/libs/stream/src/streamCheckpoint.c | 2 +- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 43ba3d2698..a1f8d74571 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -157,6 +157,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b3a2a9fa6e..ee990956ff 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1533,6 +1533,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; + sinkQuota[0] = '0'; + char dstStr[20] = {0}; + STR_TO_VARSTR(dstStr, sinkQuota) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + + char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; + strcpy(scanHistoryIdle, "100a"); + + memset(dstStr, 0, tListLen(dstStr)); + STR_TO_VARSTR(dstStr, scanHistoryIdle) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + numOfRows++; sdbRelease(pSdb, pStream); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5540e3b6fd..6201329b95 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -299,7 +299,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; From 7518103b96a114df3455cfc398e1ee2ddc1aeb35 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 13:06:12 +0800 Subject: [PATCH 10/11] fix(test): adjust test case accordingly. --- tests/system-test/0-others/information_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index ac952d383a..2bfe33d0af 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -217,7 +217,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(208, len(tdSql.queryResult)) + tdSql.checkEqual(210, len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) From d12bc412369467549565bec4819c1f9c355221a7 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 20 Nov 2023 14:30:25 +0800 Subject: [PATCH 11/11] tsdb/merge: clear file set before return 0 --- source/dnode/vnode/src/tsdb/tsdbMerge.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index ef6aba1324..7babaa6e28 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -576,7 +576,10 @@ static int32_t tsdbMerge(void *arg) { } } - if (skipMerge) return 0; + if (skipMerge) { + code = 0; + goto _exit; + } // do merge tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);