Merge branch '3.0' into enh/refactorBackend
This commit is contained in:
commit
3e2314b09b
|
@ -39,8 +39,8 @@ void s3DeleteObjectsByPrefix(const char *prefix);
|
||||||
void s3DeleteObjects(const char *object_name[], int nobject);
|
void s3DeleteObjects(const char *object_name[], int nobject);
|
||||||
bool s3Exists(const char *object_name);
|
bool s3Exists(const char *object_name);
|
||||||
bool s3Get(const char *object_name, const char *path);
|
bool s3Get(const char *object_name, const char *path);
|
||||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path);
|
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock);
|
||||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
|
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
|
||||||
void s3EvictCache(const char *path, long object_size);
|
void s3EvictCache(const char *path, long object_size);
|
||||||
long s3Size(const char *object_name);
|
long s3Size(const char *object_name);
|
||||||
|
|
||||||
|
|
|
@ -388,6 +388,7 @@ typedef struct SLastRowScanPhysiNode {
|
||||||
SNodeList* pGroupTags;
|
SNodeList* pGroupTags;
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
bool ignoreNull;
|
bool ignoreNull;
|
||||||
|
SNodeList* pTargets;
|
||||||
} SLastRowScanPhysiNode;
|
} SLastRowScanPhysiNode;
|
||||||
|
|
||||||
typedef SLastRowScanPhysiNode STableCountScanPhysiNode;
|
typedef SLastRowScanPhysiNode STableCountScanPhysiNode;
|
||||||
|
|
|
@ -25,7 +25,7 @@ static S3UriStyle uriStyleG = S3UriStylePath;
|
||||||
static int retriesG = 5;
|
static int retriesG = 5;
|
||||||
static int timeoutMsG = 0;
|
static int timeoutMsG = 0;
|
||||||
|
|
||||||
static int32_t s3Begin() {
|
int32_t s3Begin() {
|
||||||
S3Status status;
|
S3Status status;
|
||||||
const char *hostname = tsS3Hostname;
|
const char *hostname = tsS3Hostname;
|
||||||
const char *env_hn = getenv("S3_HOSTNAME");
|
const char *env_hn = getenv("S3_HOSTNAME");
|
||||||
|
@ -44,10 +44,12 @@ static int32_t s3Begin() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void s3End() { S3_deinitialize(); }
|
void s3End() { S3_deinitialize(); }
|
||||||
int32_t s3Init() { return s3Begin(); }
|
|
||||||
|
|
||||||
void s3CleanUp() { s3End(); }
|
int32_t s3Init() { return 0; /*s3Begin();*/ }
|
||||||
|
|
||||||
|
void s3CleanUp() { /*s3End();*/
|
||||||
|
}
|
||||||
|
|
||||||
static int should_retry() {
|
static int should_retry() {
|
||||||
/*
|
/*
|
||||||
|
@ -73,8 +75,9 @@ static void s3PrintError(const char *func, S3Status status, char error_details[]
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char err_msg[128];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
|
uint64_t content_length;
|
||||||
TdFilePtr file;
|
TdFilePtr file;
|
||||||
} TS3GetData;
|
} TS3GetData;
|
||||||
|
|
||||||
|
@ -83,10 +86,11 @@ typedef struct {
|
||||||
S3Status status;
|
S3Status status;
|
||||||
uint64_t content_length;
|
uint64_t content_length;
|
||||||
char *buf;
|
char *buf;
|
||||||
|
int64_t buf_pos;
|
||||||
} TS3SizeCBD;
|
} TS3SizeCBD;
|
||||||
|
|
||||||
static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) {
|
static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) {
|
||||||
// (void)callbackData;
|
// (void)callbackData;
|
||||||
return S3StatusOK;
|
return S3StatusOK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,20 +113,22 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro
|
||||||
int len = 0;
|
int len = 0;
|
||||||
const int elen = sizeof(cbd->err_msg);
|
const int elen = sizeof(cbd->err_msg);
|
||||||
if (error) {
|
if (error) {
|
||||||
if (error->message) {
|
if (error->message && elen - len > 0) {
|
||||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message);
|
len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message);
|
||||||
}
|
}
|
||||||
if (error->resource) {
|
if (error->resource && elen - len > 0) {
|
||||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource);
|
len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource);
|
||||||
}
|
}
|
||||||
if (error->furtherDetails) {
|
if (error->furtherDetails && elen - len > 0) {
|
||||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails);
|
len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails);
|
||||||
}
|
}
|
||||||
if (error->extraDetailsCount) {
|
if (error->extraDetailsCount && elen - len > 0) {
|
||||||
len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n");
|
len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n");
|
||||||
for (int i = 0; i < error->extraDetailsCount; i++) {
|
for (int i = 0; i < error->extraDetailsCount; i++) {
|
||||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name,
|
if (elen - len > 0) {
|
||||||
error->extraDetails[i].value);
|
len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name,
|
||||||
|
error->extraDetails[i].value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -213,8 +219,9 @@ static void growbuffer_destroy(growbuffer *gb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct put_object_callback_data {
|
typedef struct put_object_callback_data {
|
||||||
char err_msg[128];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
|
uint64_t content_length;
|
||||||
// FILE *infile;
|
// FILE *infile;
|
||||||
TdFilePtr infileFD;
|
TdFilePtr infileFD;
|
||||||
growbuffer *gb;
|
growbuffer *gb;
|
||||||
|
@ -226,8 +233,9 @@ typedef struct put_object_callback_data {
|
||||||
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
|
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
|
||||||
|
|
||||||
typedef struct UploadManager {
|
typedef struct UploadManager {
|
||||||
char err_msg[128];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
|
uint64_t content_length;
|
||||||
// used for initial multipart
|
// used for initial multipart
|
||||||
char *upload_id;
|
char *upload_id;
|
||||||
|
|
||||||
|
@ -241,8 +249,9 @@ typedef struct UploadManager {
|
||||||
} UploadManager;
|
} UploadManager;
|
||||||
|
|
||||||
typedef struct list_parts_callback_data {
|
typedef struct list_parts_callback_data {
|
||||||
char err_msg[128];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
|
uint64_t content_length;
|
||||||
int isTruncated;
|
int isTruncated;
|
||||||
char nextPartNumberMarker[24];
|
char nextPartNumberMarker[24];
|
||||||
char initiatorId[256];
|
char initiatorId[256];
|
||||||
|
@ -258,7 +267,7 @@ typedef struct list_parts_callback_data {
|
||||||
} list_parts_callback_data;
|
} list_parts_callback_data;
|
||||||
|
|
||||||
typedef struct MultipartPartData {
|
typedef struct MultipartPartData {
|
||||||
char err_msg[128];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
put_object_callback_data put_object_data;
|
put_object_callback_data put_object_data;
|
||||||
int seq;
|
int seq;
|
||||||
|
@ -402,7 +411,8 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
|
||||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||||
0, awsRegionG};
|
0, awsRegionG};
|
||||||
|
|
||||||
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listPartsCallback};
|
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
||||||
|
&listPartsCallback};
|
||||||
|
|
||||||
list_parts_callback_data data;
|
list_parts_callback_data data;
|
||||||
|
|
||||||
|
@ -621,7 +631,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct list_bucket_callback_data {
|
typedef struct list_bucket_callback_data {
|
||||||
char err_msg[128];
|
char err_msg[512];
|
||||||
S3Status status;
|
S3Status status;
|
||||||
int isTruncated;
|
int isTruncated;
|
||||||
char nextMarker[1024];
|
char nextMarker[1024];
|
||||||
|
@ -670,7 +680,7 @@ static void s3FreeObjectKey(void *pItem) {
|
||||||
taosMemoryFree(key);
|
taosMemoryFree(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* getListByPrefix(const char *prefix){
|
static SArray *getListByPrefix(const char *prefix) {
|
||||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||||
0, awsRegionG};
|
0, awsRegionG};
|
||||||
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
||||||
|
@ -679,7 +689,7 @@ static SArray* getListByPrefix(const char *prefix){
|
||||||
const char *marker = 0, *delimiter = 0;
|
const char *marker = 0, *delimiter = 0;
|
||||||
int maxkeys = 0, allDetails = 0;
|
int maxkeys = 0, allDetails = 0;
|
||||||
list_bucket_callback_data data;
|
list_bucket_callback_data data;
|
||||||
data.objectArray = taosArrayInit(32, sizeof(void*));
|
data.objectArray = taosArrayInit(32, sizeof(void *));
|
||||||
if (!data.objectArray) {
|
if (!data.objectArray) {
|
||||||
uError("%s: %s", __func__, "out of memoty");
|
uError("%s: %s", __func__, "out of memoty");
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -731,23 +741,27 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void s3DeleteObjectsByPrefix(const char *prefix) {
|
void s3DeleteObjectsByPrefix(const char *prefix) {
|
||||||
SArray* objectArray = getListByPrefix(prefix);
|
SArray *objectArray = getListByPrefix(prefix);
|
||||||
if(objectArray == NULL)return;
|
if (objectArray == NULL) return;
|
||||||
s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
|
s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
|
||||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
|
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||||
TS3SizeCBD *cbd = callbackData;
|
TS3SizeCBD *cbd = callbackData;
|
||||||
|
/*
|
||||||
if (cbd->content_length != bufferSize) {
|
if (cbd->content_length != bufferSize) {
|
||||||
cbd->status = S3StatusAbortedByCallback;
|
cbd->status = S3StatusAbortedByCallback;
|
||||||
return S3StatusAbortedByCallback;
|
return S3StatusAbortedByCallback;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
if (!cbd->buf) {
|
||||||
|
cbd->buf = taosMemoryCalloc(1, cbd->content_length);
|
||||||
|
}
|
||||||
|
|
||||||
char *buf = taosMemoryCalloc(1, bufferSize);
|
if (cbd->buf) {
|
||||||
if (buf) {
|
memcpy(cbd->buf + cbd->buf_pos, buffer, bufferSize);
|
||||||
memcpy(buf, buffer, bufferSize);
|
cbd->buf_pos += bufferSize;
|
||||||
cbd->buf = buf;
|
|
||||||
cbd->status = S3StatusOK;
|
cbd->status = S3StatusOK;
|
||||||
return S3StatusOK;
|
return S3StatusOK;
|
||||||
} else {
|
} else {
|
||||||
|
@ -756,7 +770,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) {
|
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||||
int status = 0;
|
int status = 0;
|
||||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||||
|
@ -769,6 +783,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
||||||
|
|
||||||
TS3SizeCBD cbd = {0};
|
TS3SizeCBD cbd = {0};
|
||||||
cbd.content_length = size;
|
cbd.content_length = size;
|
||||||
|
cbd.buf_pos = 0;
|
||||||
do {
|
do {
|
||||||
S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd);
|
S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd);
|
||||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||||
|
@ -778,19 +793,23 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
||||||
return TAOS_SYSTEM_ERROR(EIO);
|
return TAOS_SYSTEM_ERROR(EIO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (check && cbd.buf_pos != size) {
|
||||||
|
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||||
|
return TAOS_SYSTEM_ERROR(EIO);
|
||||||
|
}
|
||||||
|
|
||||||
*ppBlock = cbd.buf;
|
*ppBlock = cbd.buf;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||||
TS3GetData *cbd = (TS3GetData *) callbackData;
|
TS3GetData *cbd = (TS3GetData *)callbackData;
|
||||||
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
|
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
|
||||||
return ((wrote < (size_t) bufferSize) ?
|
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
|
||||||
S3StatusAbortedByCallback : S3StatusOK);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
|
int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
|
||||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||||
|
|
||||||
|
@ -821,21 +840,21 @@ int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){
|
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
||||||
SArray* objectArray = getListByPrefix(prefix);
|
SArray *objectArray = getListByPrefix(prefix);
|
||||||
if(objectArray == NULL) return -1;
|
if (objectArray == NULL) return -1;
|
||||||
|
|
||||||
for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){
|
for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
|
||||||
char* object = taosArrayGetP(objectArray, i);
|
char *object = taosArrayGetP(objectArray, i);
|
||||||
const char* tmp = strchr(object, '/');
|
const char *tmp = strchr(object, '/');
|
||||||
tmp = (tmp == NULL) ? object : tmp + 1;
|
tmp = (tmp == NULL) ? object : tmp + 1;
|
||||||
char fileName[PATH_MAX] = {0};
|
char fileName[PATH_MAX] = {0};
|
||||||
if(path[strlen(path) - 1] != TD_DIRSEP_CHAR){
|
if (path[strlen(path) - 1] != TD_DIRSEP_CHAR) {
|
||||||
snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
|
snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
|
||||||
}else{
|
} else {
|
||||||
snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
|
snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
|
||||||
}
|
}
|
||||||
if(s3GetObjectToFile(object, fileName) != 0){
|
if (s3GetObjectToFile(object, fileName) != 0) {
|
||||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1122,7 +1141,8 @@ bool s3Get(const char *object_name, const char *path) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) {
|
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) {
|
||||||
|
(void)check;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
cos_pool_t *p = NULL;
|
cos_pool_t *p = NULL;
|
||||||
int is_cname = 0;
|
int is_cname = 0;
|
||||||
|
@ -1314,9 +1334,11 @@ void s3DeleteObjectsByPrefix(const char *prefix) {}
|
||||||
void s3DeleteObjects(const char *object_name[], int nobject) {}
|
void s3DeleteObjects(const char *object_name[], int nobject) {}
|
||||||
bool s3Exists(const char *object_name) { return false; }
|
bool s3Exists(const char *object_name) { return false; }
|
||||||
bool s3Get(const char *object_name, const char *path) { return false; }
|
bool s3Get(const char *object_name, const char *path) { return false; }
|
||||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path) { return 0; }
|
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
|
return 0;
|
||||||
|
}
|
||||||
void s3EvictCache(const char *path, long object_size) {}
|
void s3EvictCache(const char *path, long object_size) {}
|
||||||
long s3Size(const char *object_name) { return 0; }
|
long s3Size(const char *object_name) { return 0; }
|
||||||
|
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -749,7 +749,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||||
return -1;
|
return -1;
|
||||||
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||||
0)
|
0)
|
||||||
|
@ -1693,6 +1693,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
||||||
{"ttlBatchDropNum", &tsTtlBatchDropNum},
|
{"ttlBatchDropNum", &tsTtlBatchDropNum},
|
||||||
{"ttlFlushThreshold", &tsTtlFlushThreshold},
|
{"ttlFlushThreshold", &tsTtlFlushThreshold},
|
||||||
{"ttlPushInterval", &tsTtlPushIntervalSec},
|
{"ttlPushInterval", &tsTtlPushIntervalSec},
|
||||||
|
//{"s3BlockSize", &tsS3BlockSize},
|
||||||
{"s3BlockCacheSize", &tsS3BlockCacheSize},
|
{"s3BlockCacheSize", &tsS3BlockCacheSize},
|
||||||
{"s3PageCacheSize", &tsS3PageCacheSize},
|
{"s3PageCacheSize", &tsS3PageCacheSize},
|
||||||
{"s3UploadDelaySec", &tsS3UploadDelaySec},
|
{"s3UploadDelaySec", &tsS3UploadDelaySec},
|
||||||
|
|
|
@ -3,6 +3,17 @@ add_library(dnode STATIC ${IMPLEMENT_SRC})
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
dnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode mgmt_dnode
|
dnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode mgmt_dnode
|
||||||
)
|
)
|
||||||
|
|
||||||
|
IF (TD_STORAGE)
|
||||||
|
|
||||||
|
IF(${BUILD_WITH_S3})
|
||||||
|
add_definitions(-DUSE_S3)
|
||||||
|
ELSEIF(${BUILD_WITH_COS})
|
||||||
|
add_definitions(-DUSE_COS)
|
||||||
|
ENDIF()
|
||||||
|
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
dnode
|
dnode
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
|
|
@ -18,17 +18,17 @@
|
||||||
#include "audit.h"
|
#include "audit.h"
|
||||||
#include "libs/function/tudf.h"
|
#include "libs/function/tudf.h"
|
||||||
|
|
||||||
#define DM_INIT_AUDIT() \
|
#define DM_INIT_AUDIT() \
|
||||||
do { \
|
do { \
|
||||||
auditCfg.port = tsMonitorPort; \
|
auditCfg.port = tsMonitorPort; \
|
||||||
auditCfg.server = tsMonitorFqdn; \
|
auditCfg.server = tsMonitorFqdn; \
|
||||||
auditCfg.comp = tsMonitorComp; \
|
auditCfg.comp = tsMonitorComp; \
|
||||||
if (auditInit(&auditCfg) != 0) { \
|
if (auditInit(&auditCfg) != 0) { \
|
||||||
return -1; \
|
return -1; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
static SDnode globalDnode = {0};
|
static SDnode globalDnode = {0};
|
||||||
|
|
||||||
SDnode *dmInstance() { return &globalDnode; }
|
SDnode *dmInstance() { return &globalDnode; }
|
||||||
|
|
||||||
|
@ -146,6 +146,13 @@ static bool dmCheckDataDirVersion() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(USE_S3)
|
||||||
|
|
||||||
|
extern int32_t s3Begin();
|
||||||
|
extern void s3End();
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t dmInit() {
|
int32_t dmInit() {
|
||||||
dInfo("start to init dnode env");
|
dInfo("start to init dnode env");
|
||||||
if (dmDiskInit() != 0) return -1;
|
if (dmDiskInit() != 0) return -1;
|
||||||
|
@ -156,6 +163,9 @@ int32_t dmInit() {
|
||||||
if (dmInitMonitor() != 0) return -1;
|
if (dmInitMonitor() != 0) return -1;
|
||||||
if (dmInitAudit() != 0) return -1;
|
if (dmInitAudit() != 0) return -1;
|
||||||
if (dmInitDnode(dmInstance()) != 0) return -1;
|
if (dmInitDnode(dmInstance()) != 0) return -1;
|
||||||
|
#if defined(USE_S3)
|
||||||
|
if (s3Begin() != 0) return -1;
|
||||||
|
#endif
|
||||||
|
|
||||||
dInfo("dnode env is initialized");
|
dInfo("dnode env is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -181,6 +191,9 @@ void dmCleanup() {
|
||||||
udfStopUdfd();
|
udfStopUdfd();
|
||||||
taosStopCacheRefreshWorker();
|
taosStopCacheRefreshWorker();
|
||||||
dmDiskClose();
|
dmDiskClose();
|
||||||
|
#if defined(USE_S3)
|
||||||
|
s3End();
|
||||||
|
#endif
|
||||||
dInfo("dnode env is cleaned up");
|
dInfo("dnode env is cleaned up");
|
||||||
|
|
||||||
taosCleanupCfg();
|
taosCleanupCfg();
|
||||||
|
@ -265,19 +278,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pWrapper = &pDnode->wrappers[ntype];
|
pWrapper = &pDnode->wrappers[ntype];
|
||||||
|
|
||||||
if(pWrapper->func.nodeRoleFp != NULL){
|
if (pWrapper->func.nodeRoleFp != NULL) {
|
||||||
ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
|
ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
|
||||||
dInfo("node:%s, checking node role:%d", pWrapper->name, role);
|
dInfo("node:%s, checking node role:%d", pWrapper->name, role);
|
||||||
if(role == TAOS_SYNC_ROLE_VOTER){
|
if (role == TAOS_SYNC_ROLE_VOTER) {
|
||||||
dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
|
dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
|
||||||
terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
|
terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pWrapper->func.isCatchUpFp != NULL){
|
if (pWrapper->func.isCatchUpFp != NULL) {
|
||||||
dInfo("node:%s, checking node catch up", pWrapper->name);
|
dInfo("node:%s, checking node catch up", pWrapper->name);
|
||||||
if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){
|
if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
|
||||||
terrno = TSDB_CODE_MNODE_NOT_CATCH_UP;
|
terrno = TSDB_CODE_MNODE_NOT_CATCH_UP;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -394,7 +407,4 @@ void dmReportStartup(const char *pName, const char *pDesc) {
|
||||||
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
|
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t dmGetClusterId() {
|
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
|
||||||
return globalDnode.data.clusterId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -1310,6 +1310,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
tFreeSMCfgDnodeReq(&cfgReq);
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
return 0;
|
return 0;
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||||
|
int32_t optLen = strlen("s3blocksize");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag > 1024 * 1024 || (flag > -1 && flag < 4) || flag < -1) {
|
||||||
|
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [4, 1024 * 1024]",
|
||||||
|
cfgReq.dnodeId, flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3blocksize");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
mndMCfg2DCfg(&cfgReq, &dcfgReq);
|
mndMCfg2DCfg(&cfgReq, &dcfgReq);
|
||||||
|
|
|
@ -898,6 +898,21 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
void * pIter = NULL;
|
||||||
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
|
int64_t maxChkpId = 0;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
return maxChkpId + 1;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
|
@ -906,7 +921,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||||
pMsg->checkpointId = taosGetTimestampMs();
|
pMsg->checkpointId = mndStreamGenChkpId(pMnode);
|
||||||
|
|
||||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||||
|
|
|
@ -3099,7 +3099,7 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||||
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// taosMemoryFree(pBlock);
|
// taosMemoryFree(pBlock);
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -30,10 +30,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
||||||
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
SFirstLastRes* p;
|
SFirstLastRes* p;
|
||||||
|
col_id_t colId;
|
||||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||||
int32_t slotId = slotIds[i];
|
int32_t slotId = slotIds[i];
|
||||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||||
|
colId = pColVal->colVal.cid;
|
||||||
p = (SFirstLastRes*)varDataVal(pRes[i]);
|
p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||||
|
|
||||||
p->ts = pColVal->ts;
|
p->ts = pColVal->ts;
|
||||||
|
@ -63,8 +65,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
||||||
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
||||||
continue;
|
continue;
|
||||||
}
|
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) {
|
||||||
if (pReader->numOfCols == 1 && dstSlotIds[0] != idx) {
|
|
||||||
if (!p->isNull) {
|
if (!p->isNull) {
|
||||||
colDataSetVal(pCol, numOfRows, p->buf, false);
|
colDataSetVal(pCol, numOfRows, p->buf, false);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
|
|
||||||
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
|
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
|
||||||
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
|
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
|
||||||
extern void remove_file(const char *fname);
|
extern void remove_file(const char *fname, bool last_level);
|
||||||
|
|
||||||
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
|
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
|
||||||
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
|
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
|
||||||
|
@ -532,7 +532,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
|
||||||
if (taosIsDir(file->aname)) continue;
|
if (taosIsDir(file->aname)) continue;
|
||||||
|
|
||||||
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
|
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
|
||||||
remove_file(file->aname);
|
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
|
||||||
|
remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1282,4 +1283,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
|
||||||
fs->stop = false;
|
fs->stop = false;
|
||||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbFile2.h"
|
#include "tsdbFile2.h"
|
||||||
|
#include "cos.h"
|
||||||
|
|
||||||
// to_json
|
// to_json
|
||||||
static int32_t head_to_json(const STFile *file, cJSON *json);
|
static int32_t head_to_json(const STFile *file, cJSON *json);
|
||||||
|
@ -41,10 +42,20 @@ static const struct {
|
||||||
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
||||||
};
|
};
|
||||||
|
|
||||||
void remove_file(const char *fname) {
|
void remove_file(const char *fname, bool last_level) {
|
||||||
int32_t code = taosRemoveFile(fname);
|
int32_t code = taosRemoveFile(fname);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("file:%s remove failed", fname);
|
if (tsS3Enabled && last_level) {
|
||||||
|
const char *object_name = taosDirEntryBaseName((char *)fname);
|
||||||
|
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
|
||||||
|
if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) {
|
||||||
|
s3DeleteObjects(&object_name, 1);
|
||||||
|
} else {
|
||||||
|
tsdbError("file:%s remove failed", fname);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tsdbError("file:%s remove failed", fname);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("file:%s is removed", fname);
|
tsdbInfo("file:%s is removed", fname);
|
||||||
}
|
}
|
||||||
|
@ -224,6 +235,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
||||||
fobj[0]->state = TSDB_FSTATE_LIVE;
|
fobj[0]->state = TSDB_FSTATE_LIVE;
|
||||||
fobj[0]->ref = 1;
|
fobj[0]->ref = 1;
|
||||||
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
||||||
|
fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,7 +257,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||||
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
if (fobj->state == TSDB_FSTATE_DEAD) {
|
if (fobj->state == TSDB_FSTATE_DEAD) {
|
||||||
remove_file(fobj->fname);
|
remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1);
|
||||||
}
|
}
|
||||||
taosMemoryFree(fobj);
|
taosMemoryFree(fobj);
|
||||||
}
|
}
|
||||||
|
@ -261,7 +273,7 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) {
|
||||||
taosThreadMutexUnlock(&fobj->mutex);
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
remove_file(fobj->fname);
|
remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1);
|
||||||
taosMemoryFree(fobj);
|
taosMemoryFree(fobj);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -76,6 +76,7 @@ struct STFileObj {
|
||||||
STFile f[1];
|
STFile f[1];
|
||||||
int32_t state;
|
int32_t state;
|
||||||
int32_t ref;
|
int32_t ref;
|
||||||
|
int32_t nlevel;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -340,7 +340,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
||||||
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
|
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||||
int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont;
|
int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont;
|
||||||
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||||
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock);
|
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,9 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "cos.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbFS2.h"
|
#include "tsdbFS2.h"
|
||||||
#include "cos.h"
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -292,15 +292,15 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
||||||
if (expLevel < 0) { // remove the fileset
|
if (expLevel < 0) { // remove the fileset
|
||||||
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
|
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
|
||||||
if (fobj == NULL) continue;
|
if (fobj == NULL) continue;
|
||||||
|
/*
|
||||||
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
||||||
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
|
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
|
||||||
code = tsdbRemoveFileObjectS3(rtner, fobj);
|
code = tsdbRemoveFileObjectS3(rtner, fobj);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {*/
|
||||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
code = tsdbDoRemoveFileObject(rtner, fobj);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
//}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSttLvl *lvl;
|
SSttLvl *lvl;
|
||||||
|
|
|
@ -54,6 +54,19 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM
|
||||||
|
|
||||||
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
||||||
|
|
||||||
|
static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) {
|
||||||
|
SNode* pNode;
|
||||||
|
int32_t idx = 0;
|
||||||
|
FOREACH(pNode, pTargets) {
|
||||||
|
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
|
||||||
|
pColInfo->info.colId = pCol->colId;
|
||||||
|
}
|
||||||
|
idx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -114,10 +127,12 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
capacity = TMIN(totalTables, 4096);
|
capacity = TMIN(totalTables, 4096);
|
||||||
|
|
||||||
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
|
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
|
||||||
|
setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode->pTargets);
|
||||||
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity);
|
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity);
|
||||||
} else { // by tags
|
} else { // by tags
|
||||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||||
capacity = 1; // only one row output
|
capacity = 1; // only one row output
|
||||||
|
setColIdForCacheReadBlock(pInfo->pRes, pScanNode->pTargets);
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, capacity);
|
initResultSizeInfo(&pOperator->resultInfo, capacity);
|
||||||
|
@ -192,7 +207,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pInfo->pBufferredRes->pDataBlock, i);
|
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
|
||||||
int32_t slotId = pCol->info.slotId;
|
int32_t slotId = pCol->info.slotId;
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
||||||
|
|
|
@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
|
||||||
|
|
||||||
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
|
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
|
||||||
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
|
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
|
||||||
|
static const char* jkLastRowScanPhysiPlanTargets = "Targets";
|
||||||
|
|
||||||
static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
|
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
|
||||||
|
@ -1784,6 +1785,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort);
|
code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1798,6 +1802,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort);
|
code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2052,7 +2052,8 @@ enum {
|
||||||
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
|
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
|
||||||
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
|
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
|
||||||
PHY_LAST_ROW_SCAN_CODE_GROUP_SORT,
|
PHY_LAST_ROW_SCAN_CODE_GROUP_SORT,
|
||||||
PHY_LAST_ROW_SCAN_CODE_IGNULL
|
PHY_LAST_ROW_SCAN_CODE_IGNULL,
|
||||||
|
PHY_LAST_ROW_SCAN_CODE_TARGETS
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -2068,6 +2069,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull);
|
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2091,6 +2095,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_LAST_ROW_SCAN_CODE_IGNULL:
|
case PHY_LAST_ROW_SCAN_CODE_IGNULL:
|
||||||
code = tlvDecodeBool(pTlv, &pNode->ignoreNull);
|
code = tlvDecodeBool(pTlv, &pNode->ignoreNull);
|
||||||
break;
|
break;
|
||||||
|
case PHY_LAST_ROW_SCAN_CODE_TARGETS:
|
||||||
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1285,6 +1285,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
|
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
|
||||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||||
nodesDestroyList(pPhyNode->pGroupTags);
|
nodesDestroyList(pPhyNode->pGroupTags);
|
||||||
|
nodesDestroyList(pPhyNode->pTargets);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||||
|
|
|
@ -552,6 +552,7 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
|
||||||
if (NULL == pScan) {
|
if (NULL == pScan) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
pScan->pTargets = nodesCloneList(pScanLogicNode->node.pTargets);
|
||||||
|
|
||||||
pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
|
pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
|
||||||
if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
|
if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
|
||||||
|
|
|
@ -283,6 +283,42 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 3, 1001)
|
tdSql.checkData(0, 3, 1001)
|
||||||
tdSql.checkData(0, 4, "2018-11-25 19:30:00.000")
|
tdSql.checkData(0, 4, "2018-11-25 19:30:00.000")
|
||||||
|
|
||||||
|
sql_template = 'select %s from meters partition by tbname'
|
||||||
|
select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, tbname", "last(c10), c10, ts"]
|
||||||
|
has_last_row_scan_res = [1,1,1]
|
||||||
|
sqls = self.format_sqls(sql_template, select_items)
|
||||||
|
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||||
|
tdSql.query(sqls[0], queryTimes=1)
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0,0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,1, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,2, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,3, '2018-11-25 19:30:00.000')
|
||||||
|
|
||||||
|
tdSql.query(sqls[1], queryTimes=1)
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0,0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,1, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,2, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,3, '2018-11-25 19:30:01.000')
|
||||||
|
|
||||||
|
sql_template = 'select %s from meters partition by t1'
|
||||||
|
select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, t1", "last(c10), c10, ts"]
|
||||||
|
has_last_row_scan_res = [1,1,1]
|
||||||
|
sqls = self.format_sqls(sql_template, select_items)
|
||||||
|
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||||
|
tdSql.query(sqls[0], queryTimes=1)
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0,0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,1, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,2, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,3, '2018-11-25 19:30:00.000')
|
||||||
|
|
||||||
|
tdSql.query("select ts, last(c10), t1, t2 from meters partition by t1, t2")
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0, 0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0, 1, '2018-11-25 19:30:01.000')
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
#time.sleep(99999999)
|
#time.sleep(99999999)
|
||||||
|
|
Loading…
Reference in New Issue