diff --git a/include/common/cos.h b/include/common/cos.h index 0610c63f9e..21b645f604 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -39,8 +39,8 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); -int32_t s3GetObjectsByPrefix(const char *prefix, const char* path); -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock); +int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index dbbe1d92dc..4ffcb616dd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -388,6 +388,7 @@ typedef struct SLastRowScanPhysiNode { SNodeList* pGroupTags; bool groupSort; bool ignoreNull; + SNodeList* pTargets; } SLastRowScanPhysiNode; typedef SLastRowScanPhysiNode STableCountScanPhysiNode; diff --git a/source/common/src/cos.c b/source/common/src/cos.c index daeb81b8a3..0b6b0db885 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -25,7 +25,7 @@ static S3UriStyle uriStyleG = S3UriStylePath; static int retriesG = 5; static int timeoutMsG = 0; -static int32_t s3Begin() { +int32_t s3Begin() { S3Status status; const char *hostname = tsS3Hostname; const char *env_hn = getenv("S3_HOSTNAME"); @@ -44,10 +44,12 @@ static int32_t s3Begin() { return 0; } -static void s3End() { S3_deinitialize(); } -int32_t s3Init() { return s3Begin(); } +void s3End() { S3_deinitialize(); } -void s3CleanUp() { s3End(); } +int32_t s3Init() { return 0; /*s3Begin();*/ } + +void s3CleanUp() { /*s3End();*/ +} static int should_retry() { /* @@ -73,8 +75,9 @@ static void s3PrintError(const char *func, S3Status status, char error_details[] } typedef struct { - char err_msg[128]; + char err_msg[512]; S3Status status; + uint64_t content_length; TdFilePtr file; } TS3GetData; @@ -83,10 +86,11 @@ typedef struct { S3Status status; uint64_t content_length; char *buf; + int64_t buf_pos; } TS3SizeCBD; static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) { -// (void)callbackData; + // (void)callbackData; return S3StatusOK; } @@ -109,20 +113,22 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro int len = 0; const int elen = sizeof(cbd->err_msg); if (error) { - if (error->message) { + if (error->message && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message); } - if (error->resource) { + if (error->resource && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource); } - if (error->furtherDetails) { + if (error->furtherDetails && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails); } - if (error->extraDetailsCount) { + if (error->extraDetailsCount && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n"); for (int i = 0; i < error->extraDetailsCount; i++) { - len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, - error->extraDetails[i].value); + if (elen - len > 0) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, + error->extraDetails[i].value); + } } } } @@ -213,8 +219,9 @@ static void growbuffer_destroy(growbuffer *gb) { } typedef struct put_object_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; + uint64_t content_length; // FILE *infile; TdFilePtr infileFD; growbuffer *gb; @@ -226,8 +233,9 @@ typedef struct put_object_callback_data { #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M typedef struct UploadManager { - char err_msg[128]; + char err_msg[512]; S3Status status; + uint64_t content_length; // used for initial multipart char *upload_id; @@ -241,8 +249,9 @@ typedef struct UploadManager { } UploadManager; typedef struct list_parts_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; + uint64_t content_length; int isTruncated; char nextPartNumberMarker[24]; char initiatorId[256]; @@ -258,7 +267,7 @@ typedef struct list_parts_callback_data { } list_parts_callback_data; typedef struct MultipartPartData { - char err_msg[128]; + char err_msg[512]; S3Status status; put_object_callback_data put_object_data; int seq; @@ -402,7 +411,8 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; - S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listPartsCallback}; + S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, +&listPartsCallback}; list_parts_callback_data data; @@ -621,7 +631,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } typedef struct list_bucket_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; int isTruncated; char nextMarker[1024]; @@ -670,7 +680,7 @@ static void s3FreeObjectKey(void *pItem) { taosMemoryFree(key); } -static SArray* getListByPrefix(const char *prefix){ +static SArray *getListByPrefix(const char *prefix) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, @@ -679,7 +689,7 @@ static SArray* getListByPrefix(const char *prefix){ const char *marker = 0, *delimiter = 0; int maxkeys = 0, allDetails = 0; list_bucket_callback_data data; - data.objectArray = taosArrayInit(32, sizeof(void*)); + data.objectArray = taosArrayInit(32, sizeof(void *)); if (!data.objectArray) { uError("%s: %s", __func__, "out of memoty"); return NULL; @@ -731,23 +741,27 @@ void s3DeleteObjects(const char *object_name[], int nobject) { } void s3DeleteObjectsByPrefix(const char *prefix) { - SArray* objectArray = getListByPrefix(prefix); - if(objectArray == NULL)return; + SArray *objectArray = getListByPrefix(prefix); + if (objectArray == NULL) return; s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray)); taosArrayDestroyEx(objectArray, s3FreeObjectKey); } static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { TS3SizeCBD *cbd = callbackData; + /* if (cbd->content_length != bufferSize) { cbd->status = S3StatusAbortedByCallback; return S3StatusAbortedByCallback; } + */ + if (!cbd->buf) { + cbd->buf = taosMemoryCalloc(1, cbd->content_length); + } - char *buf = taosMemoryCalloc(1, bufferSize); - if (buf) { - memcpy(buf, buffer, bufferSize); - cbd->buf = buf; + if (cbd->buf) { + memcpy(cbd->buf + cbd->buf_pos, buffer, bufferSize); + cbd->buf_pos += bufferSize; cbd->status = S3StatusOK; return S3StatusOK; } else { @@ -756,7 +770,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { int status = 0; int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; @@ -769,6 +783,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, TS3SizeCBD cbd = {0}; cbd.content_length = size; + cbd.buf_pos = 0; do { S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd); } while (S3_status_is_retryable(cbd.status) && should_retry()); @@ -778,19 +793,23 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return TAOS_SYSTEM_ERROR(EIO); } + if (check && cbd.buf_pos != size) { + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + return TAOS_SYSTEM_ERROR(EIO); + } + *ppBlock = cbd.buf; return 0; } static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) { - TS3GetData *cbd = (TS3GetData *) callbackData; - size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize); - return ((wrote < (size_t) bufferSize) ? - S3StatusAbortedByCallback : S3StatusOK); + TS3GetData *cbd = (TS3GetData *)callbackData; + size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize); + return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, char* fileName) { +int32_t s3GetObjectToFile(const char *object_name, char *fileName) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; @@ -821,21 +840,21 @@ int32_t s3GetObjectToFile(const char *object_name, char* fileName) { return 0; } -int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){ - SArray* objectArray = getListByPrefix(prefix); - if(objectArray == NULL) return -1; +int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { + SArray *objectArray = getListByPrefix(prefix); + if (objectArray == NULL) return -1; - for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){ - char* object = taosArrayGetP(objectArray, i); - const char* tmp = strchr(object, '/'); + for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) { + char *object = taosArrayGetP(objectArray, i); + const char *tmp = strchr(object, '/'); tmp = (tmp == NULL) ? object : tmp + 1; char fileName[PATH_MAX] = {0}; - if(path[strlen(path) - 1] != TD_DIRSEP_CHAR){ + if (path[strlen(path) - 1] != TD_DIRSEP_CHAR) { snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp); - }else{ + } else { snprintf(fileName, PATH_MAX, "%s%s", path, tmp); } - if(s3GetObjectToFile(object, fileName) != 0){ + if (s3GetObjectToFile(object, fileName) != 0) { taosArrayDestroyEx(objectArray, s3FreeObjectKey); return -1; } @@ -1122,7 +1141,8 @@ bool s3Get(const char *object_name, const char *path) { return ret; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) { + (void)check; int32_t code = 0; cos_pool_t *p = NULL; int is_cname = 0; @@ -1314,9 +1334,11 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } -int32_t s3GetObjectsByPrefix(const char *prefix, const char* path) { return 0; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + return 0; +} void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } +int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } #endif diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 463c4048a4..ab6ff53fbc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -749,7 +749,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -1693,6 +1693,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) { {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlPushInterval", &tsTtlPushIntervalSec}, + //{"s3BlockSize", &tsS3BlockSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize}, {"s3UploadDelaySec", &tsS3UploadDelaySec}, diff --git a/source/dnode/mgmt/node_mgmt/CMakeLists.txt b/source/dnode/mgmt/node_mgmt/CMakeLists.txt index f1be20289a..6b875db860 100644 --- a/source/dnode/mgmt/node_mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/node_mgmt/CMakeLists.txt @@ -3,6 +3,17 @@ add_library(dnode STATIC ${IMPLEMENT_SRC}) target_link_libraries( dnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode mgmt_dnode ) + +IF (TD_STORAGE) + + IF(${BUILD_WITH_S3}) + add_definitions(-DUSE_S3) + ELSEIF(${BUILD_WITH_COS}) + add_definitions(-DUSE_COS) + ENDIF() + +ENDIF () + target_include_directories( dnode PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index d560ba1644..6f13abcebc 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -18,17 +18,17 @@ #include "audit.h" #include "libs/function/tudf.h" -#define DM_INIT_AUDIT() \ - do { \ - auditCfg.port = tsMonitorPort; \ - auditCfg.server = tsMonitorFqdn; \ - auditCfg.comp = tsMonitorComp; \ - if (auditInit(&auditCfg) != 0) { \ - return -1; \ - } \ +#define DM_INIT_AUDIT() \ + do { \ + auditCfg.port = tsMonitorPort; \ + auditCfg.server = tsMonitorFqdn; \ + auditCfg.comp = tsMonitorComp; \ + if (auditInit(&auditCfg) != 0) { \ + return -1; \ + } \ } while (0) -static SDnode globalDnode = {0}; +static SDnode globalDnode = {0}; SDnode *dmInstance() { return &globalDnode; } @@ -146,6 +146,13 @@ static bool dmCheckDataDirVersion() { return true; } +#if defined(USE_S3) + +extern int32_t s3Begin(); +extern void s3End(); + +#endif + int32_t dmInit() { dInfo("start to init dnode env"); if (dmDiskInit() != 0) return -1; @@ -156,6 +163,9 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; +#if defined(USE_S3) + if (s3Begin() != 0) return -1; +#endif dInfo("dnode env is initialized"); return 0; @@ -181,6 +191,9 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); +#if defined(USE_S3) + s3End(); +#endif dInfo("dnode env is cleaned up"); taosCleanupCfg(); @@ -265,19 +278,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { pWrapper = &pDnode->wrappers[ntype]; - if(pWrapper->func.nodeRoleFp != NULL){ + if (pWrapper->func.nodeRoleFp != NULL) { ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); dInfo("node:%s, checking node role:%d", pWrapper->name, role); - if(role == TAOS_SYNC_ROLE_VOTER){ + if (role == TAOS_SYNC_ROLE_VOTER) { dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role); terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; return -1; } } - if(pWrapper->func.isCatchUpFp != NULL){ + if (pWrapper->func.isCatchUpFp != NULL) { dInfo("node:%s, checking node catch up", pWrapper->name); - if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ + if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) { terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } @@ -394,7 +407,4 @@ void dmReportStartup(const char *pName, const char *pDesc) { dDebug("step:%s, %s", pStartup->name, pStartup->desc); } -int64_t dmGetClusterId() { - return globalDnode.data.clusterId; -} - +int64_t dmGetClusterId() { return globalDnode.data.clusterId; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 85e4ef0fc2..f4108b52c6 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1310,6 +1310,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { } tFreeSMCfgDnodeReq(&cfgReq); return 0; + } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) { + int32_t optLen = strlen("s3blocksize"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag > 1024 * 1024 || (flag > -1 && flag < 4) || flag < -1) { + mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [4, 1024 * 1024]", + cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3blocksize"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #endif } else { mndMCfg2DCfg(&cfgReq, &dcfgReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 42f883aa2d..73fcd27fb8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -898,6 +898,21 @@ _OVER: return code; } +int64_t mndStreamGenChkpId(SMnode *pMnode) { + SStreamObj *pStream = NULL; + void * pIter = NULL; + SSdb * pSdb = pMnode->pSdb; + int64_t maxChkpId = 0; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + maxChkpId = TMAX(maxChkpId, pStream->checkpointId); + sdbRelease(pSdb, pStream); + } + return maxChkpId + 1; +} + static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb * pSdb = pMnode->pSdb; @@ -906,7 +921,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { } SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = taosGetTimestampMs(); + pMsg->checkpointId = mndStreamGenChkpId(pMnode); int32_t size = sizeof(SMStreamDoCheckpointMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index aa83478105..918d0bd7d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3099,7 +3099,7 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { } */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock); if (code != TSDB_CODE_SUCCESS) { // taosMemoryFree(pBlock); // code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 2909b550d7..b6aa791cf0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -30,10 +30,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = 0; SFirstLastRes* p; + col_id_t colId; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); int32_t slotId = slotIds[i]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); + colId = pColVal->colVal.cid; p = (SFirstLastRes*)varDataVal(pRes[i]); p->ts = pColVal->ts; @@ -63,8 +65,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataSetVal(pCol, numOfRows, (const char*)&ts, false); continue; - } - if (pReader->numOfCols == 1 && dstSlotIds[0] != idx) { + } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) { if (!p->isNull) { colDataSetVal(pCol, numOfRows, p->buf, false); } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 348397272d..02ef75ae86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -22,7 +22,7 @@ extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); -extern void remove_file(const char *fname); +extern void remove_file(const char *fname, bool last_level); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) @@ -532,7 +532,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { if (taosIsDir(file->aname)) continue; if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { - remove_file(file->aname); + int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); + remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1); } } @@ -1282,4 +1283,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) { fs->stop = false; taosThreadMutexUnlock(&fs->tsdb->mutex); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index 9edb03d35b..cc05b8ee18 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -14,6 +14,7 @@ */ #include "tsdbFile2.h" +#include "cos.h" // to_json static int32_t head_to_json(const STFile *file, cJSON *json); @@ -41,10 +42,20 @@ static const struct { [TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json}, }; -void remove_file(const char *fname) { +void remove_file(const char *fname, bool last_level) { int32_t code = taosRemoveFile(fname); if (code) { - tsdbError("file:%s remove failed", fname); + if (tsS3Enabled && last_level) { + const char *object_name = taosDirEntryBaseName((char *)fname); + long s3_size = tsS3Enabled ? s3Size(object_name) : 0; + if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { + s3DeleteObjects(&object_name, 1); + } else { + tsdbError("file:%s remove failed", fname); + } + } else { + tsdbError("file:%s remove failed", fname); + } } else { tsdbInfo("file:%s is removed", fname); } @@ -224,6 +235,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) { fobj[0]->state = TSDB_FSTATE_LIVE; fobj[0]->ref = 1; tsdbTFileName(pTsdb, f, fobj[0]->fname); + fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); return 0; } @@ -245,7 +257,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) { tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); if (nRef == 0) { if (fobj->state == TSDB_FSTATE_DEAD) { - remove_file(fobj->fname); + remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1); } taosMemoryFree(fobj); } @@ -261,7 +273,7 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) { taosThreadMutexUnlock(&fobj->mutex); tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); if (nRef == 0) { - remove_file(fobj->fname); + remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1); taosMemoryFree(fobj); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.h b/source/dnode/vnode/src/tsdb/tsdbFile2.h index 9da198c1f0..b94f7a9fd0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.h @@ -76,6 +76,7 @@ struct STFileObj { STFile f[1]; int32_t state; int32_t ref; + int32_t nlevel; char fname[TSDB_FILENAME_LEN]; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 151c6419cc..adb72821e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -340,7 +340,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64 int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage); int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont; int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock); + code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock); if (code != TSDB_CODE_SUCCESS) { goto _exit; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 8937ea77df..194c2784f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" #include "tsdbFS2.h" -#include "cos.h" #include "vnd.h" typedef struct { @@ -292,15 +292,15 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { if (expLevel < 0) { // remove the fileset for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - + /* int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { code = tsdbRemoveFileObjectS3(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDoRemoveFileObject(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } + } else {*/ + code = tsdbDoRemoveFileObject(rtner, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + //} } SSttLvl *lvl; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index a7b4fe02f6..6d59698855 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -54,6 +54,19 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) +static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) { + SNode* pNode; + int32_t idx = 0; + FOREACH(pNode, pTargets) { + if (nodeType(pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)pNode; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx); + pColInfo->info.colId = pCol->colId; + } + idx++; + } +} + SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -114,10 +127,12 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode->pTargets); blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output + setColIdForCacheReadBlock(pInfo->pRes, pScanNode->pTargets); } initResultSizeInfo(&pOperator->resultInfo, capacity); @@ -192,7 +207,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { - SColumnInfoData* pCol = taosArrayGet(pInfo->pBufferredRes->pDataBlock, i); + SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c97c920a3b..c9b49ee30f 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags"; static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort"; +static const char* jkLastRowScanPhysiPlanTargets = "Targets"; static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj; @@ -1784,6 +1785,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets); + } return code; } @@ -1798,6 +1802,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 99100b2a1d..ea59d93d7f 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2052,7 +2052,8 @@ enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, - PHY_LAST_ROW_SCAN_CODE_IGNULL + PHY_LAST_ROW_SCAN_CODE_IGNULL, + PHY_LAST_ROW_SCAN_CODE_TARGETS }; static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2068,6 +2069,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } return code; } @@ -2091,6 +2095,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) { case PHY_LAST_ROW_SCAN_CODE_IGNULL: code = tlvDecodeBool(pTlv, &pNode->ignoreNull); break; + case PHY_LAST_ROW_SCAN_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4f6d3d95e1..ee22caf574 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1285,6 +1285,7 @@ void nodesDestroyNode(SNode* pNode) { SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode; destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pGroupTags); + nodesDestroyList(pPhyNode->pTargets); break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index d6799a25a7..5cf3426e6f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -552,6 +552,7 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } + pScan->pTargets = nodesCloneList(pScanLogicNode->node.pTargets); pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags); if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) { diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index e75729f960..0f0936ebab 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -283,6 +283,42 @@ class TDTestCase: tdSql.checkData(0, 3, 1001) tdSql.checkData(0, 4, "2018-11-25 19:30:00.000") + sql_template = 'select %s from meters partition by tbname' + select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, tbname", "last(c10), c10, ts"] + has_last_row_scan_res = [1,1,1] + sqls = self.format_sqls(sql_template, select_items) + self.explain_and_check_res(sqls, has_last_row_scan_res) + tdSql.query(sqls[0], queryTimes=1) + tdSql.checkRows(10) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:01.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:00.000') + + tdSql.query(sqls[1], queryTimes=1) + tdSql.checkRows(10) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:00.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:01.000') + + sql_template = 'select %s from meters partition by t1' + select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, t1", "last(c10), c10, ts"] + has_last_row_scan_res = [1,1,1] + sqls = self.format_sqls(sql_template, select_items) + self.explain_and_check_res(sqls, has_last_row_scan_res) + tdSql.query(sqls[0], queryTimes=1) + tdSql.checkRows(5) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:01.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:00.000') + + tdSql.query("select ts, last(c10), t1, t2 from meters partition by t1, t2") + tdSql.checkRows(10) + tdSql.checkData(0, 0, '2018-11-25 19:30:00.000') + tdSql.checkData(0, 1, '2018-11-25 19:30:01.000') + def run(self): self.prepareTestEnv() #time.sleep(99999999)