Merge branch 'feat/TD-26060' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-08 17:31:16 +08:00
commit 3abf0ae4b2
41 changed files with 1003 additions and 566 deletions

View File

@ -39,6 +39,7 @@ void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);

View File

@ -79,8 +79,11 @@ extern int32_t tsHeartbeatTimeout;
extern int64_t tsVndCommitMaxIntervalMs;
// snode
extern char tsSnodeIp[];
extern char tsCheckpointBackupDir[];
extern int32_t tsRsyncPort;
extern char tsCheckpointBackupDir[];
// vnode checkpoint
extern char tsSnodeAddress[]; // 127.0.0.1:873
// mnode
extern int64_t tsMndSdbWriteDelta;
@ -104,8 +107,8 @@ extern int32_t tsMonitorMaxLogs;
extern bool tsMonitorComp;
// audit
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
// telem
extern bool tsEnableTelem;
@ -113,9 +116,9 @@ extern int32_t tsTelemInterval;
extern char tsTelemServer[];
extern uint16_t tsTelemPort;
extern bool tsEnableCrashReport;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
extern char * tsTelemUri;
extern char * tsClientCrashReportUri;
extern char * tsSvrCrashReportUri;
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing

View File

@ -184,11 +184,7 @@ void qDestroyTask(qTaskInfo_t tinfo);
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
@ -217,7 +213,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);

View File

@ -247,6 +247,24 @@ typedef struct {
SEpSet epset;
} SDownstreamTaskEpset;
typedef enum {
TASK_SCANHISTORY_CONT = 0x1,
TASK_SCANHISTORY_QUIT = 0x2,
TASK_SCANHISTORY_REXEC = 0x3,
} EScanHistoryRet;
typedef struct {
EScanHistoryRet ret;
int32_t idleTime;
} SScanhistoryDataInfo;
typedef struct {
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
int32_t numOfTicks;
tmr_h pTimer;
int32_t execCount;
} SScanhistorySchedInfo;
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
@ -364,7 +382,9 @@ typedef struct STaskExecStatisInfo {
int64_t init;
int64_t start;
int64_t step1Start;
double step1El;
int64_t step2Start;
double step2El;
int32_t updateCount;
int64_t latestUpdateTs;
int32_t processDataBlocks;
@ -388,9 +408,10 @@ typedef struct STaskOutputInfo {
union {
STaskDispatcherFixed fixedDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
int8_t type;
STokenBucket* pTokenBucket;
@ -424,7 +445,10 @@ struct SStreamTask {
SStreamState* pState; // state backend
SArray* pRspMsgList;
SUpstreamInfo upstreamInfo;
// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
int32_t notReadyTasks;
int32_t numOfWaitingUpstream;
int64_t checkReqId;
@ -441,8 +465,10 @@ struct SStreamTask {
typedef struct STaskStartInfo {
int64_t startTs;
int64_t readyTs;
int32_t startAllTasksFlag;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t tasksWillRestart;
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int32_t elapsedTime;
} STaskStartInfo;
@ -474,10 +500,10 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo;
STaskUpdateInfo updateInfo;
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int64_t rid;
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int64_t rid;
int64_t chkpId;
int32_t chkpCap;
@ -743,9 +769,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
@ -767,7 +791,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
@ -790,12 +816,11 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta
@ -819,11 +844,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -237,6 +237,12 @@ void syslog(int unused, const char *format, ...);
#define TD_DIRSEP "/"
#endif
#if defined(_WIN32)
#define TD_DIRSEP_CHAR '\\'
#else
#define TD_DIRSEP_CHAR '/'
#endif
#define TD_LOCALE_LEN 64
#define TD_CHARSET_LEN 64
#define TD_TIMEZONE_LEN 96

View File

@ -1052,6 +1052,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
}
}
pRequest->body.execMode = pQuery->execMode;
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
if (!pRequest->validateOnly) {

View File

@ -46,6 +46,75 @@ target_link_libraries(
INTERFACE api
)
if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
common
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(XML2_LIBRARY xml2)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
target_link_libraries(
common
# s3
PUBLIC ${S3_LIBRARY}
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
PUBLIC ${XML2_LIBRARY}
)
add_definitions(-DUSE_S3)
endif()
if(${BUILD_WITH_COS})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
common
# s3
PUBLIC cos_c_sdk_static
PUBLIC ${APR_UTIL_LIBRARY}
PUBLIC ${APR_LIBRARY}
PUBLIC ${MINIXML_LIBRARY}
PUBLIC ${CURL_LIBRARY}
)
# s3
FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF (APR_CONFIG_BIN)
EXECUTE_PROCESS(
COMMAND ${APR_CONFIG_BIN} --includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF()
include_directories (${APR_INCLUDE_DIR})
target_include_directories(
common
PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk"
PUBLIC "$ENV{HOME}/.cos-local.1/include"
)
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
endif()
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})

View File

@ -13,6 +13,7 @@ extern int8_t tsS3Https;
#if defined(USE_S3)
#include "libs3.h"
#include "tarray.h"
static int verifyPeerG = 0;
static const char *awsRegionG = NULL;
@ -34,7 +35,7 @@ static int32_t s3Begin() {
}
if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
vError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
uError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
return -1;
}
@ -65,12 +66,18 @@ static int should_retry() {
static void s3PrintError(const char *func, S3Status status, char error_details[]) {
if (status < S3StatusErrorAccessDenied) {
vError("%s: %s", __func__, S3_get_status_name(status));
uError("%s: %s", __func__, S3_get_status_name(status));
} else {
vError("%s: %s, %s", __func__, S3_get_status_name(status), error_details);
uError("%s: %s, %s", __func__, S3_get_status_name(status), error_details);
}
}
typedef struct {
char err_msg[128];
S3Status status;
TdFilePtr file;
} TS3GetData;
typedef struct {
char err_msg[128];
S3Status status;
@ -78,6 +85,11 @@ typedef struct {
char *buf;
} TS3SizeCBD;
static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) {
// (void)callbackData;
return S3StatusOK;
}
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
//(void)callbackData;
TS3SizeCBD *cbd = callbackData;
@ -291,7 +303,7 @@ S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
}
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) {
responsePropertiesCallback(properties, callbackData);
responsePropertiesCallbackNull(properties, callbackData);
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
@ -390,7 +402,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback};
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listPartsCallback};
list_parts_callback_data data;
@ -445,13 +457,13 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
data.noStatus = noStatus;
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
vError("ERROR: %s Failed to stat file %s: ", __func__, file);
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
vError("ERROR: %s Failed to open file %s: ", __func__, file);
uError("ERROR: %s Failed to open file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
@ -469,7 +481,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
metaProperties, useServerSideEncryption};
if (contentLength <= MULTIPART_CHUNK_SIZE) {
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&putObjectDataCallback};
do {
@ -486,7 +498,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
s3PrintError(__func__, data.status, data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data.contentLength) {
vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
(unsigned long long)data.contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
@ -506,14 +518,14 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback},
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq);
manager.next_etags_pos = 0;
@ -658,19 +670,19 @@ static void s3FreeObjectKey(void *pItem) {
taosMemoryFree(key);
}
void s3DeleteObjectsByPrefix(const char *prefix) {
static SArray* getListByPrefix(const char *prefix){
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&listBucketCallback};
const char *marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
list_bucket_callback_data data;
data.objectArray = taosArrayInit(32, POINTER_BYTES);
data.objectArray = taosArrayInit(32, sizeof(void*));
if (!data.objectArray) {
vError("%s: %s", __func__, "out of memoty");
return;
uError("%s: %s", __func__, "out of memoty");
return NULL;
}
if (marker) {
snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
@ -693,18 +705,15 @@ void s3DeleteObjectsByPrefix(const char *prefix) {
if (data.status == S3StatusOK) {
if (data.keyCount > 0) {
// printListBucketHeader(allDetails);
s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray));
return data.objectArray;
}
} else {
s3PrintError(__func__, data.status, data.err_msg);
}
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
return NULL;
}
void s3DeleteObjects(const char *object_name[], int nobject) {
int status = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
@ -721,6 +730,13 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
}
}
void s3DeleteObjectsByPrefix(const char *prefix) {
SArray* objectArray = getListByPrefix(prefix);
if(objectArray == NULL)return;
s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
}
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
TS3SizeCBD *cbd = callbackData;
if (cbd->content_length != bufferSize) {
@ -758,7 +774,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
} while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) {
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
return TAOS_SYSTEM_ERROR(EIO);
}
@ -767,6 +783,67 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
return 0;
}
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
TS3GetData *cbd = (TS3GetData *) callbackData;
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
return ((wrote < (size_t) bufferSize) ?
S3StatusAbortedByCallback : S3StatusOK);
}
int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&getObjectCallback};
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName);
return -1;
}
TS3GetData cbd = {0};
cbd.file = pFile;
do {
S3_get_object(&bucketContext, object_name, &getConditions, 0, 0, 0, 0, &getObjectHandler, &cbd);
} while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) {
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
taosCloseFile(&pFile);
return TAOS_SYSTEM_ERROR(EIO);
}
taosCloseFile(&pFile);
return 0;
}
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){
SArray* objectArray = getListByPrefix(prefix);
if(objectArray == NULL) return -1;
for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){
char* object = taosArrayGetP(objectArray, i);
const char* tmp = strchr(object, '/');
tmp = (tmp == NULL) ? object : tmp + 1;
char fileName[PATH_MAX] = {0};
if(path[strlen(path) - 1] != TD_DIRSEP_CHAR){
snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
}else{
snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
}
if(s3GetObjectToFile(object, fileName) != 0){
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
return -1;
}
}
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
return 0;
}
long s3Size(const char *object_name) {
long size = 0;
int status = 0;
@ -782,7 +859,7 @@ long s3Size(const char *object_name) {
} while (S3_status_is_retryable(cbd.status) && should_retry());
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
}
size = cbd.content_length;
@ -1237,6 +1314,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path) { return 0; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }

View File

@ -141,7 +141,7 @@ void startRsync(){
}
char cmd[PATH_MAX] = {0};
snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir);
snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
// start rsync service to backup checkpoint
code = system(cmd);
if(code != 0){
@ -168,7 +168,7 @@ int uploadRsync(char* id, char* path){
#else
path
#endif
, tsSnodeIp, id);
, tsSnodeAddress, id);
}else{
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
#ifdef WINDOWS
@ -176,7 +176,7 @@ int uploadRsync(char* id, char* path){
#else
path
#endif
, tsSnodeIp, id);
, tsSnodeAddress, id);
}
int code = execCommand(command);
@ -195,7 +195,7 @@ int downloadRsync(char* id, char* path){
#endif
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsSnodeIp, id,
tsSnodeAddress, id,
#ifdef WINDOWS
pathTransform
#else
@ -221,7 +221,7 @@ int deleteRsync(char* id){
}
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/",
tmp, tsSnodeIp, id);
tmp, tsSnodeAddress, id);
code = execCommand(command);
taosRemoveDir(tmp);

View File

@ -128,7 +128,8 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
// int32_t tsSmlBatchSize = 10000;
// checkpoint backup
char tsSnodeIp[TSDB_FQDN_LEN] = {0};
char tsSnodeAddress[TSDB_FQDN_LEN] = {0};
int32_t tsRsyncPort = 873;
#ifdef WINDOWS
char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\";
#else
@ -675,10 +676,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddString(pCfg, "snodeIp", tsSnodeIp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
@ -1116,8 +1120,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tstrncpy(tsSnodeIp, cfgGetItem(pCfg, "snodeIp")->str, TSDB_FQDN_LEN);
tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN);
tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
@ -1295,7 +1300,7 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
if (strcasecmp("keepColumnName", name) == 0) {
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
} else if (strcasecmp("keepAliveIdle", name) == 0) {
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->bval;
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
} else {
matchItem = false;
}

View File

@ -160,75 +160,6 @@ target_link_libraries(
PUBLIC index
)
if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
vnode
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(XML2_LIBRARY xml2)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
target_link_libraries(
vnode
# s3
PUBLIC ${S3_LIBRARY}
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
PUBLIC ${XML2_LIBRARY}
)
add_definitions(-DUSE_S3)
endif()
if(${BUILD_WITH_COS})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
vnode
# s3
PUBLIC cos_c_sdk_static
PUBLIC ${APR_UTIL_LIBRARY}
PUBLIC ${APR_LIBRARY}
PUBLIC ${MINIXML_LIBRARY}
PUBLIC ${CURL_LIBRARY}
)
# s3
FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF (APR_CONFIG_BIN)
EXECUTE_PROCESS(
COMMAND ${APR_CONFIG_BIN} --includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF()
include_directories (${APR_INCLUDE_DIR})
target_include_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk"
PUBLIC "$ENV{HOME}/.cos-local.1/include"
)
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
endif()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
ENDIF ()

View File

@ -43,9 +43,9 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore;
// tqPush
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2)
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
// tqExec
@ -156,9 +156,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq);
int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);

View File

@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
int32_t tqRestartStreamTasks(STQ* pTq);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -1080,7 +1080,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
return code;
}
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
@ -1121,7 +1121,11 @@ static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq)
}
}
// this function should be executed by only one thread
static void ddxx() {
}
// this function should be executed by only one thread, so we set an sentinel to protect this function
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1150,6 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
}
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
int64_t ts = taosGetTimestampMs();
@ -1163,7 +1168,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
} else {
if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start);
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
} else {
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%" PRId64, id,
pTask->execInfo.step2Start);
@ -1184,20 +1190,37 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
streamScanHistoryData(pTask);
int64_t st = taosGetTimestampMs();
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - st) / 1000.0;
pTask->execInfo.step1El += el;
if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
} else {
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
el, pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p,
pTask->execInfo.step1El);
}
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
// the following procedure should be executed, no matter status is stop/pause or not
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
if (pTask->info.fillHistory) {
SStreamTask* pStreamTask = NULL;
@ -1220,23 +1243,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code == TSDB_CODE_SUCCESS) {
doStartStep2(pTask, pStreamTask, pTq);
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
} else {
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
// Not update the fill-history time window until the state transfer is completed if the related fill-history task
// exists.
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64
", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
// Not update the fill-history time window until the state transfer is completed.
tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask);
}
@ -1314,14 +1334,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
tqStartStreamTask(pTq);
return 0;
}
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqScanWal(pTq);
return 0;
} else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
tqStartStreamTasks(pTq);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
tqRestartStreamTasks(pTq);
return 0;
}
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
@ -1509,6 +1530,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
streamSchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
@ -1905,7 +1927,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.startAllTasksFlag = 1;
pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
@ -1914,45 +1936,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} else {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.startAllTasksFlag = 0;
pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else {
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
terrno = 0;
streamMetaWUnLock(pMeta);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqLaunchStreamTaskAsync(pTq);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
tqStartStreamTaskAsync(pTq, true);
}
}

View File

@ -25,7 +25,7 @@ typedef struct STableSinkInfo {
tstr name;
} STableSinkInfo;
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData);

View File

@ -22,6 +22,8 @@
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) {
@ -58,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) {
return 0;
}
int32_t tqStartStreamTask(STQ* pTq) {
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -73,6 +75,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
@ -97,7 +100,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskReadyInfo(pTask);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
@ -115,7 +118,67 @@ int32_t tqStartStreamTask(STQ* pTq) {
return code;
}
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
int32_t tqRestartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTasks(pTq);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
@ -132,10 +195,10 @@ int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
return -1;
}
tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks);
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID;
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
@ -320,14 +383,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
return false;
}
static bool taskReadyForDataFromWal(SStreamTask* pTask) {
bool taskReadyForDataFromWal(SStreamTask* pTask) {
// non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
return false;
}
// not in ready state, do not handle the data from wal
// int32_t status = pTask->status.taskStatus;
char* p = NULL;
int32_t status = streamTaskGetStatus(pTask, &p);
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
@ -359,7 +421,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
return true;
}
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0;
@ -449,7 +511,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);
tqDebug("s-task:%s lock", pTask->id.idStr);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);

View File

@ -846,14 +846,14 @@ static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) {
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[0].pin = true;
ASSERT(!pInfo->blockData[1].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
return;
}
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[1].pin = true;
ASSERT(!pInfo->blockData[0].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
return;
}

View File

@ -565,9 +565,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (ASCENDING_TRAVERSE(pReader->info.order)) {
w.skey = pScanInfo->lastKey + step;
w.skey = pScanInfo->lastProcKey + step;
} else {
w.ekey = pScanInfo->lastKey + step;
w.ekey = pScanInfo->lastProcKey + step;
}
if (isEmptyQueryTimeWindow(&w)) {
@ -607,14 +607,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
clearBrinBlockIter(&iter);
pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size;
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
pBlockNum->numOfSttFiles = pReader->status.pCurrentFileset->lvlArr->size;
int32_t total = pBlockNum->numOfSttFiles + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug(
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles,
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfSttFiles,
sizeInDisk / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total;
@ -1200,13 +1200,12 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p
}
}
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) {
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo, int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order);
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
int64_t key = 0;
if (bHasDataInLastBlock) {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt);
} else {
key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey;
@ -1215,10 +1214,10 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader
return key;
}
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock,
SLastBlockReader* pLastBlockReader, int32_t order) {
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo,
int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order);
int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order);
int64_t key = getBoarderKeyInFiles(pBlock, pScanInfo, order);
return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) ||
(!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key));
@ -1302,10 +1301,9 @@ typedef struct {
} SDataBlockToLoadInfo;
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
STsdbReader* pReader) {
int32_t neighborIndex = 0;
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
SBrinRecord rec = {0};
int32_t neighborIndex = 0;
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
pReader->info.order, &rec);
@ -1319,9 +1317,11 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order);
if (hasDataInLastBlock(pLastBlockReader)) {
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
pInfo->overlapWithLastBlock = !(pBlockInfo->record.lastKey < tsLast || pBlockInfo->record.firstKey > tsLast);
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
pInfo->overlapWithLastBlock =
!(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt);
}
pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity;
@ -1336,9 +1336,9 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
TSDBKEY keyInBuf) {
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool loadDataBlock =
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
@ -1358,9 +1358,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
}
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
TSDBKEY keyInBuf) {
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
info.overlapWithDelInfo || info.overlapWithLastBlock);
return isCleanFileBlock;
@ -1417,14 +1417,15 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return code;
}
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) {
static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->lastKeyInStt += step;
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey += step;
return false;
}
@ -1433,10 +1434,11 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key;
pScanInfo->sttKeyInfo.nextProcKey = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
pVerRange)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
}
}
@ -1457,7 +1459,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
// avoid the fetch next row replace the referenced stt block in buffer
doPinSttBlock(pLastBlockReader);
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pLastBlockReader);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
@ -1694,7 +1696,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
if (copied) {
pBlockScanInfo->lastKey = tsLastBlock;
pBlockScanInfo->lastProcKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -2062,9 +2064,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* d = NULL;
TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->info.order)) {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey + 1, .version = pReader->info.verRange.minVer};
} else {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey - 1, .version = pReader->info.verRange.maxVer};
}
int32_t code =
@ -2129,9 +2131,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKeyInStt;
w.skey = pScanInfo->sttKeyInfo.nextProcKey;
} else {
w.ekey = pScanInfo->lastKeyInStt;
w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
}
int64_t st = taosGetTimestampUs();
@ -2164,7 +2166,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initLastBlockReader += (el / 1000.0);
@ -2209,7 +2211,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
}
if (copied) {
pBlockScanInfo->lastKey = key;
pBlockScanInfo->lastProcKey = key;
return TSDB_CODE_SUCCESS;
} else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
@ -2354,16 +2356,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
(pRecord->numRow <= pReader->resBlockInfo.capacity)) {
if (asc || (!hasDataInLastBlock(pLastBlockReader))) {
int64_t cap = pReader->resBlockInfo.capacity;
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) {
if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
}
// record the last key value
pBlockScanInfo->lastKey = asc ? pRecord->lastKey : pRecord->firstKey;
pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey;
goto _end;
}
}
@ -2378,6 +2380,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
SBlockData* pBlockData = &pReader->status.fileBlockData;
initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader);
while (1) {
bool hasBlockData = false;
@ -2527,7 +2530,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastFiles = 0;
pBlockNum->numOfSttFiles = 0;
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk));
@ -2564,7 +2567,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return code;
}
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
break;
}
}
@ -2684,11 +2687,13 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
}
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) {
if(!hasDataInLastBlock(pLastBlockReader)) {
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
return true;
} else {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt);
}
}
@ -2717,10 +2722,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return terrno;
}
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
}
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf)) {
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2728,13 +2735,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block
code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) {
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pScanInfo, pReader->info.order)) {
// data in memory that are earlier than current file block and stt blocks
// rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order);
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) {
if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) {
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
@ -2745,7 +2752,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey;
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
@ -2760,8 +2767,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
tsdbDebug("load data in last block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs();
// let's load data from stt files
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
// no data in last block, no need to proceed.
while (hasDataInLastBlock(pLastBlockReader)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2988,7 +3000,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
lastKey = pScanInfo->lastProcKey;
}
pDumpInfo->totalRows = pBlockInfo->record.numRow;
@ -3013,7 +3025,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
}
// all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastFiles == 0) {
if (num.numOfBlocks + num.numOfSttFiles == 0) {
pReader->status.loadFromFile = false;
taosArrayDestroy(pTableList);
return code;
@ -3458,15 +3470,15 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
idStr);
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
pScanInfo->sttKeyInfo.nextProcKey, idStr);
break;
}
}
@ -3722,7 +3734,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
pBlock->info.dataLoad = 1;
pBlock->info.rows += 1;
pScanInfo->lastKey = pTSRow->ts;
pScanInfo->lastProcKey = pTSRow->ts;
return TSDB_CODE_SUCCESS;
}
@ -3856,14 +3868,15 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
// todo extract method
if (ASCENDING_TRAVERSE(pReader->info.order)) {
int64_t skey = pReader->info.window.skey;
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->lastKeyInStt = skey;
pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->sttKeyInfo.nextProcKey = skey;
} else {
int64_t ekey = pReader->info.window.ekey;
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->lastKeyInStt = ekey;
pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->sttKeyInfo.nextProcKey = ekey;
}
pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
@ -4224,7 +4237,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;

View File

@ -157,17 +157,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey;
pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->sttKeyInfo.nextProcKey = skey;
} else {
int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey;
pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->sttKeyInfo.nextProcKey = ekey;
}
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
pScanInfo->lastProcKey, pTsdbReader->idStr);
}
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
@ -200,8 +201,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastKey = ts;
pInfo->lastKeyInStt = ts + step;
pInfo->lastProcKey = ts;
pInfo->sttKeyInfo.nextProcKey = ts + step;
}
}
@ -241,6 +242,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pBlockIdxList);
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
}
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) {

View File

@ -63,20 +63,31 @@ typedef struct STableDataBlockIdx {
int32_t globalIndex;
} STableDataBlockIdx;
typedef enum ESttKeyStatus {
STT_FILE_READER_UNINIT = 0x0,
STT_FILE_NO_DATA = 0x1,
STT_FILE_HAS_DATA = 0x2,
} ESttKeyStatus;
typedef struct SSttKeyInfo {
ESttKeyStatus status; // this value should be updated when switch to the next fileset
int64_t nextProcKey;
} SSttKeyInfo;
typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
uint64_t uid;
TSKEY lastProcKey;
SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo;
typedef struct SResultBlockInfo {
@ -108,7 +119,7 @@ typedef struct STableUidList {
typedef struct {
int32_t numOfBlocks;
int32_t numOfLastFiles;
int32_t numOfSttFiles;
} SBlockNumber;
typedef struct SBlockIndex {

View File

@ -557,7 +557,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
streamMetaWLock(pMeta);
if (pMeta->startInfo.startAllTasksFlag) {
if (pMeta->startInfo.tasksWillRestart) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
streamMetaWUnLock(pMeta);
return;
@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
} else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqResetStreamTaskStatus(pVnode->pTq);
tqLaunchStreamTaskAsync(pVnode->pTq);
tqStartStreamTaskAsync(pVnode->pTq, false);
}
} else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);

View File

@ -871,32 +871,6 @@ int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
}
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
if (pTaskInfo->pRoot == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t nOptrWithVal = 0;
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput);
// *len = 0;
// }
return 0;
}
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
return TSDB_CODE_INVALID_PARA;
}
return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
}
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
@ -1072,7 +1046,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
}
}
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;
}

View File

@ -1584,8 +1584,7 @@ typedef union SRowsDataContext{
SStbRowsDataContext* pStbRowsCxt;
} SRowsDataContext;
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken,
char* ctbName, bool* pFoundCtbName) {
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) {
*pFoundCtbName = false;
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS){
@ -1595,7 +1594,13 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
if (pToken->n > 0) {
if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
memcpy(pStbRowsCxt->ctbName.tname, pToken->z, pToken->n);
for (int i = 0; i < pToken->n; ++i) {
if (pToken->z[i] == '.') {
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not contain '.'");
} else {
pStbRowsCxt->ctbName.tname[i] = pToken->z[i];
}
}
pStbRowsCxt->ctbName.tname[pToken->n] = '\0';
*pFoundCtbName = true;
} else {
@ -1677,8 +1682,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
}
}
else if (pCols->pColIndex[i] == tbnameIdx) {
char ctbName[TSDB_TABLE_NAME_LEN];
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, bFoundTbName);
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName);
}
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {

View File

@ -266,7 +266,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
if (NULL == pScanCols) {
if (NULL == pScanPseudoCols) {
return SCAN_TYPE_TABLE;
return (!tagScan) ? SCAN_TYPE_TABLE : SCAN_TYPE_TAG;
}
return FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType
? SCAN_TYPE_BLOCK_INFO

View File

@ -136,7 +136,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
@ -151,16 +151,20 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
//#define CHECKPOINT_PATH_LEN 128
// typedef struct SChekpointDataHeader{
// int64_t size;
// char name[CHECKPOINT_PATH_LEN];
// char id[CHECKPOINT_PATH_LEN];
//} SChekpointDataHeader;
typedef enum UPLOAD_TYPE {
UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1,
} UPLOAD_TYPE;
int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
UPLOAD_TYPE getUploadType();
int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "rsync.h"
#include "streamInt.h"
@ -384,105 +385,55 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
// static int64_t kBlockSize = 64 * 1024;
// static int sendCheckpointToS3(char* id, SArray* fileList){
// code = s3PutObjectFromFile2(from->fname, object_name);
// return 0;
//}
// static int sendCheckpointToSnode(char* id, SArray* fileList){
// if(strlen(id) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint id name too long, name:%s", id);
// return -1;
// }
// uint8_t* buf = taosMemoryCalloc(1, sizeof(SChekpointDataHeader) + kBlockSize);
// if(buf == NULL){
// tqError("uploadCheckpoint malloc failed");
// return -1;
// }
//
// SChekpointDataHeader* pHdr = (SChekpointDataHeader*)buf;
// strcpy(pHdr->id, id);
//
// TdFilePtr fd = NULL;
// for(int i = 0; i < taosArrayGetSize(fileList); i++){
// char* name = (char*)taosArrayGetP(fileList, i);
// if(strlen(name) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint file name too long, name:%s", name);
// return -1;
// }
// int64_t offset = 0;
//
// fd = taosOpenFile(name, TD_FILE_READ);
// tqDebug("uploadCheckpoint open file %s, file index: %d", name, i);
//
// while(1){
// int64_t nread = taosPReadFile(fd, buf + sizeof(SChekpointDataHeader), kBlockSize, offset);
// if (nread == -1) {
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// tqError("uploadCheckpoint failed to read file name:%s,reason:%d", name, errno);
// return -1;
// } else if (nread == 0){
// tqDebug("uploadCheckpoint no data read, close file:%s, move to next file, open and read", name);
// taosCloseFile(&fd);
// break;
// } else if (nread == kBlockSize){
// offset += nread;
// } else {
// taosCloseFile(&fd);
// offset = 0;
// }
// tqDebug("uploadCheckpoint read file %s, size:%" PRId64 ", current offset:%" PRId64, name, nread, offset);
//
//
// pHdr->size = nread;
// strcpy(pHdr->name, name);
//
// SRpcMsg rpcMsg = {0};
// int32_t bytes = sizeof(SChekpointDataHeader) + nread;
// rpcMsg.pCont = rpcMallocCont(bytes);
// rpcMsg.msgType = TDMT_SYNC_SNAPSHOT_SEND;
// rpcMsg.contLen = bytes;
// if (rpcMsg.pCont == NULL) {
// tqError("uploadCheckpoint malloc failed");
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// return -1;
// }
// memcpy(rpcMsg.pCont, buf, bytes);
// int try = 3;
// int32_t code = 0;
// while(try-- > 0){
// code = tmsgSendReq(pEpSet, &rpcMsg);
// if(code == 0)
// break;
// taosMsleep(10);
// }
// if(code != 0){
// tqError("uploadCheckpoint send request failed code:%d", code);
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// return -1;
// }
//
// if(offset == 0){
// break;
// }
// }
// }
//
// taosMemoryFree(buf);
static int uploadCheckpointToS3(char* id, char* path) {
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
//}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
char filename[PATH_MAX] = {0};
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
snprintf(filename, sizeof(filename), "%s%s", path, name);
} else {
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
}
char object[PATH_MAX] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if (s3PutObjectFromFile2(filename, object) != 0) {
taosCloseDir(&pDir);
return -1;
}
stDebug("[s3] upload checkpoint:%s", filename);
}
taosCloseDir(&pDir);
return 0;
}
UPLOAD_TYPE getUploadType() {
if (strlen(tsSnodeAddress) != 0) {
return UPLOAD_RSYNC;
} else if (tsS3StreamEnabled) {
return UPLOAD_S3;
} else {
return UPLOAD_DISABLE;
}
}
int uploadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("uploadCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeIp) != 0) {
uploadRsync(id, path);
// }else if(tsS3StreamEnabled){
if (strlen(tsSnodeAddress) != 0) {
return uploadRsync(id, path);
} else if (tsS3StreamEnabled) {
return uploadCheckpointToS3(id, path);
}
return 0;
}
@ -492,9 +443,10 @@ int downloadCheckpoint(char* id, char* path) {
stError("downloadCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeIp) != 0) {
downloadRsync(id, path);
// }else if(tsS3StreamEnabled){
if (strlen(tsSnodeAddress) != 0) {
return downloadRsync(id, path);
} else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path);
}
return 0;
}
@ -504,9 +456,18 @@ int deleteCheckpoint(char* id) {
stError("deleteCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeIp) != 0) {
deleteRsync(id);
// }else if(tsS3StreamEnabled){
if (strlen(tsSnodeAddress) != 0) {
return deleteRsync(id);
} else if (tsS3StreamEnabled) {
s3DeleteObjectsByPrefix(id);
}
return 0;
}
int deleteCheckpointFile(char* id, char* name) {
char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name);
char* tmp = object;
s3DeleteObjects((const char**)&tmp, 1);
return 0;
}

View File

@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));

View File

@ -18,7 +18,8 @@
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define STREAM_RESULT_DUMP_THRESHOLD 300
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -48,10 +49,9 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
}
streamDispatchStreamBlock(pTask);
return code;
}
return 0;
return code;
}
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
@ -187,83 +187,118 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code;
}
int32_t streamScanHistoryData(SStreamTask* pTask) {
static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
int32_t code = TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
}
} else {
taosArrayDestroy(pRes);
}
return code;
}
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
int32_t numOfBlocks = 0;
while (1) {
if (streamTaskShouldStop(pTask)) {
break;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
break;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
tstrerror(code));
continue;
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
(*pFinish) = qStreamScanhistoryFinished(exec);
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
(*pSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break;
}
}
}
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
qSetStreamOpOpen(exec);
while (!finished) {
while (1) {
if (streamTaskShouldPause(pTask)) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
break;
stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr);
// quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr,
tstrerror(terrno));
continue;
}
int32_t size = 0;
int32_t numOfBlocks = 0;
while (1) {
if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue;
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
finished = qStreamRecoverScanFinished(exec);
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break;
}
if(streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
}
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
taosArrayDestroy(pRes);
// dispatch the generated results
int32_t code = handleResultBlocks(pTask, pRes, size);
int64_t el = taosGetTimestampMs() - st;
// downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
}
if (finished) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
}
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms",
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
}
}
return 0;
}
// wait for the stream task to be idle
@ -273,7 +308,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) {
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
pStreamTask->id.idStr);
pStreamTask->id.idStr);
taosMsleep(100);
}
@ -647,23 +682,25 @@ int32_t streamExecTask(SStreamTask* pTask) {
int32_t streamTaskReleaseState(SStreamTask* pTask) {
stDebug("s-task:%s release exec state", pTask->id.idStr);
void* pExecutor = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
if (pExecutor != NULL) {
int32_t code = qStreamOperatorReleaseState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
code = qStreamOperatorReleaseState(pExecutor);
}
return code;
}
int32_t streamTaskReloadState(SStreamTask* pTask) {
stDebug("s-task:%s reload exec state", pTask->id.idStr);
void* pExecutor = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
if (pExecutor != NULL) {
int32_t code = qStreamOperatorReloadState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
code = qStreamOperatorReloadState(pExecutor);
}
return code;
}
int32_t streamAlignTransferState(SStreamTask* pTask) {

View File

@ -301,6 +301,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) {
goto _err;
}
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pFailedTaskSet == NULL) {
goto _err;
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
@ -362,6 +368,7 @@ _err:
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
taosMemoryFree(pMeta);
stError("failed to open stream meta");
@ -369,12 +376,8 @@ _err:
}
int32_t streamMetaReopen(SStreamMeta* pMeta) {
// backup the restart flag
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
streamMetaClear(pMeta);
pMeta->startInfo.startAllTasksFlag = restartFlag;
// NOTE: role should not be changed during reopen meta
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
@ -442,7 +445,10 @@ void streamMetaClear(SStreamMeta* pMeta) {
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;
streamMetaResetStartInfo(&pMeta->startInfo);
// the willrestart/starting flag can NOT be cleared
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.readyTs = 0;
}
void streamMetaClose(SStreamMeta* pMeta) {
@ -484,6 +490,7 @@ void streamMetaCloseImpl(void* arg) {
// taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
taosMemoryFree(pMeta->pHbInfo);
taosMemoryFree(pMeta->path);
@ -1241,8 +1248,11 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pReadyTaskSet);
pStartInfo->startAllTasksFlag = 0;
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0
atomic_store_32(&pStartInfo->taskStarting, 0);
}
void streamMetaRLock(SStreamMeta* pMeta) {

View File

@ -159,7 +159,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id);
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
taosMsleep(10);
return TSDB_CODE_SUCCESS;
}
@ -340,10 +341,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}
// the result should be put into the outputQ in any cases, otherwise, the result may be lost
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue;
// wait for the output queue is available for new data to dispatch
while (streamQueueIsFull(pTask->outputq.queue)) {
if (streamTaskShouldStop(pTask)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
@ -373,7 +375,8 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) {
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
const char* id) {
if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA;
@ -388,6 +391,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS;
}
@ -406,12 +410,12 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
double incSize = (delta / 1000.0) * pBucket->quotaRate;
if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now;
}
if (incNum > 0 || incSize > 0) {
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
" idle for %.2f Sec, %s",
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id);
}
}

View File

@ -19,6 +19,10 @@
#include "wal.h"
#include "streamsm.h"
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
typedef struct SLaunchHTaskInfo {
SStreamMeta* pMeta;
STaskId id;
@ -30,6 +34,12 @@ typedef struct STaskRecheckInfo {
void* checkTimer;
} STaskRecheckInfo;
typedef struct STaskInitTs {
int64_t start;
int64_t end;
bool success;
} STaskInitTs;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
@ -57,7 +67,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p);
streamMetaUpdateTaskReadyInfo(pTask);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
return TSDB_CODE_SUCCESS;
}
@ -81,6 +91,60 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
static void doReExecScanhistory(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
streamMetaReleaseTask(pTask->pMeta, pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer,
&pTask->schedHistoryInfo.pTimer);
}
}
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
numOfTicks = 1;
} else if (numOfTicks > SCANHISTORY_IDLE_TICK) {
numOfTicks = SCANHISTORY_IDLE_TICK;
}
// add ref for task
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
ASSERT(p != NULL);
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
}
static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
@ -318,6 +382,31 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
}
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == nodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
@ -367,40 +456,23 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
doProcessDownstreamReadyRsp(pTask);
}
} else { // not ready, wait for 100ms and retry
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError(
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
"not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else {
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError(
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
"not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == pRsp->downstreamNodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
return 0;
}
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
@ -676,9 +748,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
int32_t hTaskId = pHTaskInfo->id.taskId;
streamTaskGetStatus(pTask, &p);
stDebug(
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
@ -975,28 +1046,57 @@ void streamTaskEnablePause(SStreamTask* pTask) {
pTask->status.pauseAllowed = 1;
}
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
}
}
}
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
SStreamMeta* pMeta = pTask->pMeta;
streamMetaWLock(pMeta);
STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
STaskStartInfo* pStartInfo = &pMeta->startInfo;
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
pStartInfo->readyTs = pTask->execInfo.start;
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
streamMetaResetStartInfo(pStartInfo);
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
}
streamMetaWUnLock(pMeta);

View File

@ -401,8 +401,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
pTask->outputInfo.pDownstreamUpdateList = NULL;
pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
@ -449,7 +448,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate);
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);

View File

@ -315,7 +315,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockx", pTask->id.idStr);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}

View File

@ -27,11 +27,18 @@
#include "rsync.h"
#include "streamInt.h"
#include "cos.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
strcpy(tsSnodeIp, "127.0.0.1");
if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
printf("error");
}
if (s3Init() < 0) {
return -1;
}
strcpy(tsSnodeAddress, "127.0.0.1");
return RUN_ALL_TESTS();
}
@ -42,15 +49,20 @@ TEST(testCase, checkpointUpload_Test) {
taosSsleep(5);
char* id = "2013892036";
uploadCheckpoint(id, "/Users/mingmingwanng/rsync/");
uploadCheckpoint(id, "/root/offset/");
}
TEST(testCase, checkpointDownload_Test) {
char* id = "2013892036";
downloadRsync(id, "/Users/mingmingwanng/rsync/tmp");
downloadCheckpoint(id, "/root/offset/download/");
}
TEST(testCase, checkpointDelete_Test) {
char* id = "2013892036";
deleteRsync(id);
deleteCheckpoint(id);
}
TEST(testCase, checkpointDeleteFile_Test) {
char* id = "2013892036";
deleteCheckpointFile(id, "offset-ver0");
}

View File

@ -865,6 +865,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
if (pMsg->seq > pRcvBuf->cursor) {
if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
}
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
@ -1002,7 +1005,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
}
if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
pMsg->seq);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
}

View File

@ -268,7 +268,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
", seq:%d, ack:%d, "
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
"), finish:%d, as:%d, to-dnode:%d}"

View File

@ -19,6 +19,7 @@ exe:
gcc $(CFLAGS) ./whiteListTest.c -o $(ROOT)whiteListTest $(LFLAGS)
gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS)
gcc $(CFLAGS) ./tmqViewTest.c -o $(ROOT)tmqViewTest $(LFLAGS)
gcc $(CFLAGS) ./stmtQuery.c -o $(ROOT)stmtQuery $(LFLAGS)
clean:
rm $(ROOT)batchprepare
@ -29,3 +30,4 @@ clean:
rm $(ROOT)whiteListTest
rm $(ROOT)insert_stb
rm $(ROOT)tmqViewTest
rm $(ROOT)stmtQuery

View File

@ -0,0 +1,129 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include "../../../include/client/taos.h"
#define PRINT_ERROR printf("\033[31m");
#define PRINT_SUCCESS printf("\033[32m");
void execute_simple_sql(void *taos, char *sql) {
TAOS_RES *result = taos_query(taos, sql);
if ( result == NULL || taos_errno(result) != 0) {
PRINT_ERROR
printf("failed to %s, Reason: %s\n", sql, taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
PRINT_SUCCESS
printf("Successfully %s\n", sql);
}
void check_result(TAOS_RES *result, int expected) {
int rows = 0;
TAOS_ROW row;
while ((row = taos_fetch_row(result))) {
rows++;
}
if (rows == expected) {
PRINT_SUCCESS
printf("%d rows are fetched as expected\n", rows);
} else {
PRINT_ERROR
printf("%d rows are fetched but %d expected\n", rows, expected);
}
taos_free_result(result);
}
int main(int argc, char *argv[]) {
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL) {
PRINT_ERROR
printf("TDengine error: failed to connect\n");
exit(EXIT_FAILURE);
}
PRINT_SUCCESS
printf("Successfully connected to TDengine\n");
execute_simple_sql(taos, "drop database if exists test");
execute_simple_sql(taos, "create database test");
execute_simple_sql(taos, "use test");
execute_simple_sql(taos, "create table super(ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(8), c6 smallint, c7 tinyint, c8 bool, c9 nchar(8), c10 timestamp) tags (t1 int, t2 bigint, t3 float, t4 double, t5 binary(8), t6 smallint, t7 tinyint, t8 bool, t9 nchar(8))");
char *sql = calloc(1, 1024*1024);
int sqlLen = 0;
sqlLen = sprintf(sql, "create table");
for (int i = 0; i < 10; i++) {
sqlLen += sprintf(sql + sqlLen, " t%d using super tags (%d, 2147483648, 0.1, 0.000000001, 'abcdefgh', 32767, 127, 1, '一二三四五六七八')", i, i);
}
execute_simple_sql(taos, sql);
strcpy(sql, "insert into t0 (ts, c1) values(now, 1)");
execute_simple_sql(taos, sql);
strcpy(sql, "insert into t0 (ts, c1) values(now, 2)");
execute_simple_sql(taos, sql);
strcpy(sql, "insert into t0 (ts, c1) values(now, 3)");
execute_simple_sql(taos, sql);
int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9");
if (code != 0) {
PRINT_ERROR
printf("failed to load table info: 0x%08x\n", code);
exit(EXIT_FAILURE);
}
PRINT_SUCCESS
printf("Successfully load table info\n");
TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) {
PRINT_ERROR
printf("TDengine error: failed to init taos_stmt\n");
exit(EXIT_FAILURE);
}
PRINT_SUCCESS
printf("Successfully init taos_stmt\n");
char* condBuf = "2 or c1 > 0";
TAOS_MULTI_BIND params[1];
params[0].buffer_type = TSDB_DATA_TYPE_BINARY;
params[0].buffer_length = strlen(condBuf) + 1;
params[0].buffer = condBuf;
params[0].length = (int*)&params[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
char *stmt_sql = "select * from super where c1 > ?";
code = taos_stmt_prepare(stmt, stmt_sql, 0);
if (code != 0){
PRINT_ERROR
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
exit(EXIT_FAILURE);
}
PRINT_SUCCESS
printf("Successfully execute taos_stmt_prepare\n");
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
if (taos_stmt_execute(stmt) != 0) {
PRINT_ERROR
printf("failed to execute query statement.\n");
exit(EXIT_FAILURE);
}
PRINT_SUCCESS
printf("Successfully execute query statement.\n");
TAOS_RES *result = taos_stmt_use_result(stmt);
check_result(result, 1);
taos_stmt_close(stmt);
return 0;
}

View File

@ -70,6 +70,7 @@ sql_error insert into d2.st values(now, 1, 1)
sql_error insert into d2.st(ts, f) values(now, 1);
sql_error insert into d2.st(ts, f, tbname) values(now, 1);
sql_error insert into d2.st(ts, f, tbname) values(now, 1, '');
sql_error insert into d2.st(ts, f, tbname) values(now, 1, 'd2.ct2');
sql_error insert into d2.st(ts, tbname) values(now, 1, 34)
sql_error insert into st using st2 tags(2) values(now,1);
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -164,4 +164,9 @@ sql select count(*) from (select tags ts from stb34)
if $data00 != 2 then
return -1
endi
sql select tags 2 from stb34
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT