Merge branch '3.0' into enh/TD-23769-3.0x

This commit is contained in:
kailixu 2023-11-03 16:26:07 +08:00
commit 77eadc59dc
39 changed files with 868 additions and 562 deletions

View File

@ -539,7 +539,8 @@ TO_CHAR(ts, format_str_literal)
- When `ms`,`us`,`ns` are used in `to_char`, like `to_char(ts, 'yyyy-mm-dd hh:mi:ss.ms.us.ns')`, The time of `ms`,`us`,`ns` corresponds to the same fraction seconds. When ts is `1697182085123`, the output of `ms` is `123`, `us` is `123000`, `ns` is `123000000`.
- If we want to output some characters of format without converting, surround it with double quotes. `to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. If want to output double quotes, add a back slash before double quote, like `to_char(ts, '\"yyyy-mm-dd\"')` will output `"2023-10-10"`.
- For formats that output digits, the uppercase and lowercase formats are the same.
- It's recommended to put time zone in the format, if not, the default time zone zone will be that in server or client.
- It's recommended to put time zone in the format, if not, the default time zone will be that in server or client.
- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified.
#### TO_TIMESTAMP
@ -564,9 +565,10 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
- The uppercase or lowercase of `MONTH`, `MON`, `DAY`, `DY` and formtas that output digits have same effect when used in `to_timestamp`, like `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month` can be replaced by `MONTH`, or `month`. The cases are ignored.
- If multi times are specified for one component, the previous will be overwritten. Like `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, the output year will be `2022`.
- To avoid unexpected time zone used during the convertion, it's recommended to put time zone in the ts string, e.g. '2023-10-10 10:10:10+08'. If time zone not specified, default will be that in server or client.
- The default timestamp if some components are not specified will be: `1970-01-01 00:00:00` with specified or default local timezone.
- The default timestamp if some components are not specified will be: `1970-01-01 00:00:00` with the timezone specified or default to local timezone.
- If `AM` or `PM` is specified in formats, the Hour must between `1-12`.
- In some cases, `to_timestamp` can convert correctly even the format and the timestamp string are not totally matched. Like `to_timetamp('200101/2', 'yyyyMM1/dd')`, the digit `1` in format string are ignored, and the output timestsamp is `2001-01-02 00:00:00`. Spaces and tabs in formats and tiemstamp string are also ignored automatically.
- The precision of the output timestamp will be the same as the table in SELECT stmt, millisecond will be used if no table is specified. The output of `select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')` will be truncated to millisecond precision. If a nano precision table is specified, no truncation will be applied. Like `select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`.
### Time and Date Functions

View File

@ -540,6 +540,7 @@ TO_CHAR(ts, format_str_literal)
- 时间格式中无法匹配规则的内容会直接输出. 如果想要在格式串中指定某些能够匹配规则的部分不做转换, 可以使用双引号, 如`to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. 如果想要输出双引号, 那么在双引号之前加一个反斜杠, 如 `to_char(ts, '\"yyyy-mm-dd\"')` 将会输出 `"2023-10-10"`.
- 那些输出是数字的格式, 如`YYYY`, `DD`, 大写与小写意义相同, 即`yyyy` 和 `YYYY` 可以互换.
- 推荐在时间格式中带时区信息,如果不带则默认输出的时区为服务端或客户端所配置的时区.
- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒.
#### TO_TIMESTAMP
@ -560,13 +561,14 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
**支持的格式**: 与`to_char`相同
**使用说明**:
- 若`ms`, `us`, `ns`同时指定, 那么结果时间戳包含上述三个字段的和. 如 `to_timestamp('2023-10-10 10:10:10.123.000456.000000789', 'yyyy-mm-dd hh:mi:ss.ms.us.ns')` 输出`2023-10-10 10:10:10.123456789`.
- 若`ms`, `us`, `ns`同时指定, 那么结果时间戳包含上述三个字段的和. 如 `to_timestamp('2023-10-10 10:10:10.123.000456.000000789', 'yyyy-mm-dd hh:mi:ss.ms.us.ns')` 输出`2023-10-10 10:10:10.123456789`对应的时间戳.
- `MONTH`, `MON`, `DAY`, `DY` 以及其他输出为数字的格式的大小写意义相同, 如 `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month`可以被替换为`MONTH` 或者`Month`.
- 如果同一字段被指定了多次, 那么前面的指定将会被覆盖. 如 `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, 输出年份是`2022`.
- 为避免转换时使用了非预期的时区,推荐在时间中携带时区信息,例如'2023-10-10 10:10:10+08',如果未指定时区则默认时区为服务端或客户端指定的时区。
- 如果没有指定完整的时间,那么默认时间值为指定或默认时区的 `1970-01-01 00:00:00`, 未指定部分使用该默认值中的对应部分.
- 如果格式串中有`AM`, `PM`等, 那么小时必须是12小时制, 范围必须是01-12.
- `to_timestamp`转换具有一定的容错机制, 在格式串和时间戳串不完全对应时, 有时也可转换, 如: `to_timestamp('200101/2', 'yyyyMM1/dd')`, 格式串中多出来的1会被丢弃. 格式串与时间戳串中多余的空格字符(空格, tab等)也会被 自动忽略. 如`to_timestamp(' 23 年 - 1 月 - 01 日 ', 'yy 年-MM月-dd日')` 可以被成功转换. 虽然`MM`等字段需要两个数字对应(只有一位时前面补0), 在`to_timestamp`时, 一个数字也可以成功转换.
- 输出时间戳的精度与查询表的精度相同, 若查询未指定表, 则输出精度为毫秒. 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')`的输出将会把微妙和纳秒进行截断. 如果指定一张纳秒表, 那么就不会发生截断, 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`.
### 时间和日期函数

View File

@ -762,7 +762,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamRestoreParam(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
@ -807,6 +806,10 @@ void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -752,7 +752,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
#define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
#define TSDB_CODE_FUNC_DUP_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x2805)
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED TAOS_DEF_ERROR_CODE(0, 0x2806)
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR TAOS_DEF_ERROR_CODE(0, 0x2806)
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR TAOS_DEF_ERROR_CODE(0, 0x2807)
//udf
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
@ -819,6 +820,8 @@ int32_t* taosGetErrno();
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -128,7 +128,7 @@ char tsSmlTsDefaultName[TSDB_COL_NAME_LEN] = "_ts";
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
// If set to empty system will generate table name using MD5 hash.
// If set to empty system will generate table name using MD5 hash.
// true means that the name and order of cols in each line are the same(only for influx protocol)
// bool tsSmlDataFormat = false;
// int32_t tsSmlBatchSize = 10000;
@ -280,8 +280,10 @@ int8_t tsS3Enabled = false;
int8_t tsS3Https = true;
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
int32_t tsS3BlockSize = 4096; // number of tsdb pages
int32_t tsS3BlockSize = -1; // number of tsdb pages (4096)
int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsS3PageCacheSize = 4096; // number of pages
int32_t tsS3UploadDelaySec = 60 * 60;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
@ -460,7 +462,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0)
return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1;
@ -675,7 +678,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1;
@ -693,8 +697,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0)
return -1;
// min free disk space used to check if the disk is full [50MB, 1GB]
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
@ -953,7 +960,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
return -1;
}
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str,
TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
@ -1125,6 +1133,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
tsS3PageCacheSize = cfgGetItem(pCfg, "s3PageCacheSize")->i32;
tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32;
GRANT_CFG_GET;
return 0;
@ -1422,7 +1432,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
} else if (strcasecmp("smlChildTableName", name) == 0) {
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
} else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) {
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str,
TSDB_TABLE_NAME_LEN);
} else if (strcasecmp("smlTagName", name) == 0) {
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
// } else if (strcasecmp("smlDataFormat", name) == 0) {
@ -1717,6 +1728,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
return;
}
if (strcasecmp(option, "s3PageCacheSize") == 0) {
int32_t newS3PageCacheSize = atoi(value);
uInfo("s3PageCacheSize set from %d to %d", tsS3PageCacheSize, newS3PageCacheSize);
tsS3PageCacheSize = newS3PageCacheSize;
return;
}
if (strcasecmp(option, "s3UploadDelaySec") == 0) {
int32_t newS3UploadDelaysec = atoi(value);
uInfo("s3UploadDelaySec set from %d to %d", tsS3UploadDelaySec, newS3UploadDelaysec);
tsS3UploadDelaySec = newS3UploadDelaysec;
return;
}
if (strcasecmp(option, "ttlPushInterval") == 0) {
int32_t newTtlPushInterval = atoi(value);
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);

View File

@ -1320,7 +1320,7 @@ static void tm2char(const SArray* formats, const struct STm* tm, char* s, int32_
s += 4;
break;
case TSFKW_DDD:
sprintf(s, "%d", tm->tm.tm_yday);
sprintf(s, "%03d", tm->tm.tm_yday + 1);
s += strlen(s);
break;
case TSFKW_DD:

View File

@ -344,7 +344,7 @@ TEST(timeTest, ts2char) {
"day-\"\"",
TSDB_TIME_PRECISION_MILLI,
"2023-023-23-3-2023-023-23-3-年-OCTOBER -OCT-October -Oct-october "
"-oct-月-285-13-6-285-13-6-FRIDAY -Friday -friday -日");
"-oct-月-286-13-6-286-13-6-FRIDAY -Friday -friday -日");
#endif
ts = 1697182085123L; // Friday, October 13, 2023 3:28:05.123 PM GMT+08:00
test_ts2char(ts, "HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,

View File

@ -1228,6 +1228,70 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "monitor");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
int32_t optLen = strlen("s3blocksize");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag > 1024 * 1024) {
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
strcpy(dcfgReq.config, "s3blocksize");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) {
int32_t optLen = strlen("s3blockcachesize");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 4 || flag > 1024 * 1024) {
mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]",
cfgReq.dnodeId, flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
strcpy(dcfgReq.config, "s3blockcachesize");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "s3pagecachesize", 16) == 0) {
int32_t optLen = strlen("s3pagecachesize");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 4 || flag > 1024 * 1024 * 1024) {
mError("dnode:%d, failed to config s3PageCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
strcpy(dcfgReq.config, "s3pagecachesize");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) {
int32_t optLen = strlen("s3uploaddelaysec");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 600 || flag > 60 * 60 * 24 * 30) {
mError("dnode:%d, failed to config s3UploadDelaySec since value:%d. Valid range: [600, 60 * 60 * 24 * 30]",
cfgReq.dnodeId, flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
strcpy(dcfgReq.config, "s3uploaddelaysec");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
int32_t optLen = strlen("ttlpushinterval");
int32_t flag = -1;

View File

@ -165,17 +165,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
streamMetaWLock(pSnode->pMeta);
bool added = false;
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock);
streamMetaWUnLock(pSnode->pMeta);
return -1;
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock);
streamMetaWUnLock(pSnode->pMeta);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
@ -195,14 +195,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
// commit the update
taosWLockLatch(&pSnode->pMeta->lock);
streamMetaWLock(pSnode->pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
if (streamMetaCommit(pSnode->pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pSnode->pMeta->lock);
streamMetaWUnLock(pSnode->pMeta);
return 0;
}

View File

@ -382,6 +382,8 @@ struct STsdb {
TdThreadMutex biMutex;
SLRUCache *bCache;
TdThreadMutex bMutex;
SLRUCache *pgCache;
TdThreadMutex pgMutex;
struct STFileSystem *pFS; // new
SRocksCache rCache;
};
@ -909,7 +911,9 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle);
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);

View File

@ -27,6 +27,8 @@ extern "C" {
extern int8_t tsS3Enabled;
extern int32_t tsS3BlockSize;
extern int32_t tsS3BlockCacheSize;
extern int32_t tsS3PageCacheSize;
extern int32_t tsS3UploadDelaySec;
int32_t s3Init();
void s3CleanUp();

View File

@ -1023,10 +1023,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
int64_t streamId = pTask->id.streamId;
bool added = false;
taosWLockLatch(&pStreamMeta->lock);
streamMetaWLock(pStreamMeta);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
taosWUnLockLatch(&pStreamMeta->lock);
streamMetaWUnLock(pStreamMeta);
if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
@ -1064,6 +1064,47 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
return code;
}
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
SVersionRange *pRange = &pTask->dataRange.range;
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamExecTask(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) {
tqScanWalAsync(pTq, false);
}
}
}
// this function should be executed by only one thread
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
@ -1126,10 +1167,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
if (pTask->info.fillHistory == 1) {
ASSERT(pTask->status.pauseAllowed == true);
}
streamScanHistoryData(pTask);
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
@ -1146,18 +1183,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL;
// 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s",
tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1166,112 +1200,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
#if 0
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100);
}
// now we can stop the stream task execution
int64_t nextProcessedVer = 0;
while (1) {
taosThreadMutexLock(&pStreamTask->lock);
int8_t status = pStreamTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
// return; do nothing
}
if (status == TASK_STATUS__HALT) {
// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
//
// taosThreadMutexUnlock(&pStreamTask->lock);
// break;
}
if (pStreamTask->status.taskStatus == TASK_STATUS__CK) {
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt",
pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__CK));
taosThreadMutexUnlock(&pStreamTask->lock);
taosMsleep(1000);
continue;
}
// upgrade to halt status
if (status == TASK_STATUS__PAUSE) {
qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code == TSDB_CODE_SUCCESS) {
doStartStep2(pTask, pStreamTask, pTq);
} else {
qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status));
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
}
pStreamTask->status.keepTaskStatus = status;
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
// wal scan not start yet, reset it to be the start position
nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
if (nextProcessedVer == -1) {
nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1;
}
tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s",
pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus,
id);
taosThreadMutexUnlock(&pStreamTask->lock);
break;
}
#endif
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
// streamTaskRestoreStatus(pTask);
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
// }
streamExecTask(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
tqScanWalAsync(pTq, false);
}
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
@ -1467,14 +1403,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
// commit the update
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
@ -1785,7 +1721,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
taosThreadMutexUnlock(&pTask->lock);
int32_t total = 0;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
@ -1794,7 +1730,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
total = pMeta->numOfStreamTasks;
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
@ -1865,7 +1801,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
// update the nodeEpset when it exists
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
@ -1874,7 +1810,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
@ -1883,7 +1820,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
} else {
@ -1896,19 +1833,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
// continue after lock the meta again
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -1958,47 +1895,49 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
} else {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.startAllTasksFlag = 0;
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
} else {
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
terrno = 0;
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
while (streamMetaTaskInTimer(pMeta)) {
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqLaunchStreamTaskAsync(pTq);
} else {
vInfo("vgId:%d, follower node not start stream tasks", vgId);
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
}
}

View File

@ -35,9 +35,9 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
tqProcessSubmitReqForSubscribe(pTq);
}
taosRLockLatch(&pTq->pStreamMeta->lock);
streamMetaRLock(pTq->pStreamMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
streamMetaRUnLock(pTq->pStreamMeta);
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);

View File

@ -1111,7 +1111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
streamMetaWLock(pTq->pStreamMeta);
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
if (pIter == NULL) {
@ -1128,6 +1128,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
streamMetaWUnLock(pTq->pStreamMeta);
return 0;
}

View File

@ -38,19 +38,17 @@ int32_t tqScanWal(STQ* pTq) {
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
if (shouldIdle) {
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
int32_t times = (--pMeta->walScanCounter);
ASSERT(pMeta->walScanCounter >= 0);
streamMetaWUnLock(pMeta);
if (pMeta->walScanCounter <= 0) {
taosWUnLockLatch(&pMeta->lock);
if (times <= 0) {
break;
}
taosWUnLockLatch(&pMeta->lock);
} else {
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
}
}
taosMsleep(SCAN_WAL_IDLE_DURATION);
}
@ -61,6 +59,7 @@ int32_t tqScanWal(STQ* pTq) {
}
int32_t tqStartStreamTask(STQ* pTq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -71,11 +70,11 @@ int32_t tqStartStreamTask(STQ* pTq) {
}
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
@ -104,12 +103,16 @@ int32_t tqStartStreamTask(STQ* pTq) {
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return 0;
return code;
}
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
@ -148,12 +151,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return TSDB_CODE_SUCCESS;
}
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
@ -164,7 +167,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
@ -174,7 +177,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
// reset the counter value, since we do not launch the scan wal operation.
pMeta->walScanCounter = 0;
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
@ -182,7 +185,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return -1;
}
@ -193,7 +196,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
@ -209,9 +212,9 @@ int32_t tqStopStreamTasks(STQ* pTq) {
}
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
@ -412,9 +415,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
// clone the task list, to avoid the task update during scan wal files
SArray* pTaskList = NULL;
taosWLockLatch(&pStreamMeta->lock);
streamMetaWLock(pStreamMeta);
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
taosWUnLockLatch(&pStreamMeta->lock);
streamMetaWUnLock(pStreamMeta);
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
@ -446,6 +449,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);
tqDebug("s-task:%s lock", pTask->id.idStr);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);

View File

@ -87,6 +87,41 @@ static void tsdbCloseBCache(STsdb *pTsdb) {
}
}
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
int32_t code = 0;
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->pgMutex, NULL);
_err:
pTsdb->pgCache = pCache;
return code;
}
static void tsdbClosePgCache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->pgCache;
if (pCache) {
int32_t elems = taosLRUCacheGetElems(pCache);
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
taosLRUCacheEraseUnrefEntries(pCache);
elems = taosLRUCacheGetElems(pCache);
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->bMutex);
}
}
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
typedef struct {
@ -1191,6 +1226,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err;
}
code = tsdbOpenPgCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbOpenRocksCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1221,6 +1262,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
tsdbCloseBICache(pTsdb);
tsdbCloseBCache(pTsdb);
tsdbClosePgCache(pTsdb);
tsdbCloseRocksCache(pTsdb);
}
@ -3057,7 +3099,6 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
}
*/
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
// int64_t size = 4096;
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
if (code != TSDB_CODE_SUCCESS) {
// taosMemoryFree(pBlock);
@ -3123,10 +3164,42 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
return code;
}
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
taosLRUCacheRelease(pCache, h, false);
getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
*handle = taosLRUCacheLookup(pCache, key, keyLen);
return code;
}
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
LRUHandle *handle = NULL;
getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
taosThreadMutexLock(&pFD->pTsdb->pgMutex);
handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
if (!handle) {
size_t charge = pFD->szPage;
_taos_lru_deleter_t deleter = deleteBCache;
uint8_t *pPg = taosMemoryMalloc(charge);
memcpy(pPg, pPage, charge);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
// ignore cache updating if not ok
// code = TSDB_CODE_OUT_OF_MEMORY;
}
}
taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
return code;
}

View File

@ -46,6 +46,7 @@ typedef struct {
STFileSet *fset;
TABLEID tbid[1];
bool hasTSData;
bool skipTsRow;
} ctx[1];
// reader
@ -127,18 +128,18 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
continue;
}
}
/*
extern int8_t tsS3Enabled;
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
bool skipRow = false;
committer->ctx->skipTsRow = false;
if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) {
skipRow = true;
committer->ctx->skipTsRow = true;
}
*/
int64_t ts = TSDBROW_TS(&row->row);
if (skipRow && ts <= committer->ctx->maxKey) {
if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) {
ts = committer->ctx->maxKey + 1;
}
@ -402,6 +403,32 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
// reset nextKey
committer->ctx->nextKey = TSKEY_MAX;
committer->ctx->skipTsRow = false;
extern int8_t tsS3Enabled;
extern int32_t tsS3UploadDelaySec;
long s3Size(const char *object_name);
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
committer->ctx->skipTsRow = false;
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
if (fobj && fobj->f->did.level == nlevel - 1) {
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
if (taosCheckExistFile(fobj->fname)) {
int32_t mtime = 0;
taosStatFile(fobj->fname, NULL, &mtime, NULL);
if (mtime < committer->ctx->now - tsS3UploadDelaySec) {
committer->ctx->skipTsRow = true;
}
} else if (s3Size(object_name) > 0) {
committer->ctx->skipTsRow = true;
}
}
// new fset can be written with ts data
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);

View File

@ -303,6 +303,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) {
if (f1->did.id != f2->did.id) return false;
if (f1->fid != f2->fid) return false;
if (f1->cid != f2->cid) return false;
if (f1->s3flag != f2->s3flag) return false;
return true;
}

View File

@ -58,6 +58,7 @@ int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2);
struct STFile {
tsdb_ftype_t type;
SDiskID did; // disk id
int32_t s3flag;
int32_t fid; // file id
int64_t cid; // commit id
int64_t size;

View File

@ -131,6 +131,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
}
if (pFD->s3File) {
tsdbWarn("%s file: %s", __func__, pFD->path);
return code;
}
if (pFD->pgno > 0) {
@ -177,7 +178,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
if (code == TSDB_CODE_SUCCESS && !handle) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
@ -189,7 +190,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
} else {
// seek
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
@ -253,7 +254,7 @@ _exit:
return code;
}
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t n = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
@ -282,10 +283,122 @@ _exit:
return code;
}
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t n = 0;
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
int64_t bOffset = fOffset % pFD->szPage;
ASSERT(bOffset < szPgCont);
// 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
// 2, fetch pgnoStart ~ pgnoEnd from s3
// 3, store pgs to pcache & last pg to pFD->pBuf
// 4, deliver pgs to [pBuf, pBuf + size)
while (n < size) {
if (pFD->pgno != pgno) {
LRUHandle *handle = NULL;
code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
if (code != TSDB_CODE_SUCCESS) {
if (handle) {
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
}
goto _exit;
}
if (!handle) {
break;
}
uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
memcpy(pFD->pBuf, pPage, pFD->szPage);
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
// check
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
pFD->pgno = pgno;
}
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
n += nRead;
pgno++;
bOffset = 0;
}
if (n < size) {
// 2, retrieve pgs from s3
uint8_t *pBlock = NULL;
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont;
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
// 3, Store Pages in Cache
int nPage = pgnoEnd - pgno + 1;
for (int i = 0; i < nPage; ++i) {
tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
// check
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
pFD->pgno = pgno;
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
n += nRead;
pgno++;
bOffset = 0;
}
taosMemoryFree(pBlock);
}
_exit:
return code;
}
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0;
if (!pFD->pFD) {
code = tsdbOpenFileImpl(pFD);
if (code) {
goto _exit;
}
}
if (pFD->s3File && tsS3BlockSize < 0) {
return tsdbReadFileS3(pFD, offset, pBuf, size);
} else {
return tsdbReadFileImp(pFD, offset, pBuf, size);
}
_exit:
return code;
}
int32_t tsdbFsyncFile(STsdbFD *pFD) {
int32_t code = 0;
if (pFD->s3File) {
tsdbWarn("%s file: %s", __func__, pFD->path);
return code;
}
code = tsdbWriteFilePage(pFD);

View File

@ -206,6 +206,8 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, const
},
};
op.nf.s3flag = true;
code = TARRAY2_APPEND(rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
@ -322,27 +324,40 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
if (fobj->f->did.level == did.level) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (fobj->f->did.level == did.level) {
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 &&
taosCheckExistFile(fobj->fname)) {
int32_t mtime = 0;
taosStatFile(fobj->fname, NULL, &mtime, NULL);
if (mtime < rtner->now - tsS3UploadDelaySec) {
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
continue;
}
/*
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (tsS3Enabled) {
int64_t fsize = 0;
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(terrno);
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
fobj->fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode),
__func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit);
}
s3EvictCache(fobj->fname, fsize * 2);
}
*/
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
//}
}
// stt

View File

@ -80,8 +80,8 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
/*
int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
if (nlevel > 1 && tsS3Enabled) {
if (nlevel == 3) {
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
@ -89,7 +89,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0;
}
}
*/
for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {

View File

@ -13,14 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "audit.h"
#include "tencode.h"
#include "tmsg.h"
#include "tstrbuild.h"
#include "vnd.h"
#include "vndCos.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "audit.h"
#include "tstrbuild.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
@ -263,8 +263,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
now *= 1000000;
}
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
int32_t keep = pVnode->config.tsdbCfg.keep2;
/*
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
if (nlevel > 1 && tsS3Enabled) {
if (nlevel == 3) {
keep = pVnode->config.tsdbCfg.keep1;
@ -272,6 +273,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
keep = pVnode->config.tsdbCfg.keep0;
}
}
*/
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
@ -904,7 +906,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
tbNames = taosArrayInit(req.nReqs, sizeof(char*));
tbNames = taosArrayInit(req.nReqs, sizeof(char *));
if (rsp.pArray == NULL || tbUids == NULL || tbNames == NULL) {
rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -950,8 +952,8 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
taosArrayPush(rsp.pArray, &cRsp);
if(tsEnableAuditCreateTable){
char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (tsEnableAuditCreateTable) {
char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
strcpy(str, pCreateReq->name);
taosArrayPush(tbNames, &str);
}
@ -976,24 +978,24 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
if(tsEnableAuditCreateTable){
if (tsEnableAuditCreateTable) {
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SStringBuilder sb = {0};
for(int32_t iReq = 0; iReq < req.nReqs; iReq++){
char** key = (char**)taosArrayGet(tbNames, iReq);
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
char **key = (char **)taosArrayGet(tbNames, iReq);
taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
if(iReq < req.nReqs - 1){
if (iReq < req.nReqs - 1) {
taosStringBuilderAppendChar(&sb, ',');
}
taosMemoryFreeClear(*key);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
char *keyJoined = taosStringBuilderGetResult(&sb, &len);
auditRecord(NULL, clusterId, "createTable", name.dbname, "", keyJoined, len);
@ -1164,7 +1166,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
tbNames = taosArrayInit(req.nReqs, sizeof(char*));
tbNames = taosArrayInit(req.nReqs, sizeof(char *));
if (tbUids == NULL || rsp.pArray == NULL || tbNames == NULL) goto _exit;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
@ -1187,8 +1189,8 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
taosArrayPush(rsp.pArray, &dropTbRsp);
if(tsEnableAuditCreateTable){
char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (tsEnableAuditCreateTable) {
char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
strcpy(str, pDropTbReq->name);
taosArrayPush(tbNames, &str);
}
@ -1197,24 +1199,24 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
tdUpdateTbUidList(pVnode->pSma, pStore, false);
if(tsEnableAuditCreateTable){
if (tsEnableAuditCreateTable) {
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SStringBuilder sb = {0};
for(int32_t iReq = 0; iReq < req.nReqs; iReq++){
char** key = (char**)taosArrayGet(tbNames, iReq);
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
char **key = (char **)taosArrayGet(tbNames, iReq);
taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
if(iReq < req.nReqs - 1){
if (iReq < req.nReqs - 1) {
taosStringBuilderAppendChar(&sb, ',');
}
taosMemoryFreeClear(*key);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
char *keyJoined = taosStringBuilderGetResult(&sb, &len);
auditRecord(NULL, clusterId, "dropTable", name.dbname, "", keyJoined, len);
@ -1518,7 +1520,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
code = TSDB_CODE_INVALID_MSG;
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);

View File

@ -554,10 +554,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true;
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) {
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
streamMetaWLock(pMeta);
if (pMeta->startInfo.startAllTasksFlag) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
streamMetaWUnLock(pMeta);
return;
}
@ -574,7 +576,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
}
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
streamMetaWUnLock(pMeta);
}
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {

View File

@ -1906,8 +1906,13 @@ _return:
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = code;
pRes->pRes = NULL;
if (TSDB_CODE_MND_VIEW_NOT_EXIST == code) {
ctgTaskDebug("Get view %d.%s.%s meta failed with %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
} else {
ctgTaskError("Get view %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
}
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;

View File

@ -320,7 +320,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
}
case TDMT_MND_VIEW_META: {
if (TSDB_CODE_SUCCESS != rspCode) {
if (TSDB_CODE_MND_VIEW_NOT_EXIST == rspCode) {
qDebug("no success rsp for get view-meta, error:%s, viewFName:%s", tstrerror(rspCode), target);
} else {
qError("error rsp for get view-meta, error:%s, viewFName:%s", tstrerror(rspCode), target);
}
CTG_ERR_RET(rspCode);
}

View File

@ -3315,7 +3315,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "to_timestamp",
.type = FUNCTION_TYPE_TO_TIMESTAMP,
.classification = FUNC_MGT_SCALAR_FUNC,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
.translateFunc = translateToTimestamp,
.getEnvFunc = NULL,
.initFunc = NULL,

View File

@ -434,17 +434,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
void *rsp = NULL;
int32_t dataLen = 0;
SOutputData sOutput = {0};
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
if (TSDB_CODE_SUCCESS == code) {
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput);
}
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
return TSDB_CODE_SUCCESS;
}
if (rsp) {
if (NULL != rsp) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
}
}
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
@ -456,7 +461,6 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
dataLen);
}
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -1230,7 +1230,7 @@ int32_t toTimestampFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam
code = taosChar2Ts(format, &formats, tsStr, &ts, precision, errMsg, 128);
if (code) {
qError("func to_timestamp failed %s", errMsg);
code = TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED;
code = code == -1 ? TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR : TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR;
break;
}
colDataSetVal(pOutput->columnData, i, (char *)&ts, false);

View File

@ -184,13 +184,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
{ // todo: remove this when the pipeline checkpoint generating is used.
SStreamMeta* pMeta = pTask->pMeta;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
}
//todo fix race condition: set the status and append checkpoint block
@ -281,8 +281,9 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
@ -304,10 +305,10 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
char* str = NULL;
streamTaskGetStatus(p, &str);
int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return -1;
} else { // save the task
streamMetaSaveTask(pMeta, p);
@ -320,17 +321,16 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
str);
}
if (streamMetaCommit(pMeta) < 0) {
taosWUnLockLatch(&pMeta->lock);
code = streamMetaCommit(pMeta);
if (code < 0) {
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
checkpointId, terrstr());
return -1;
} else {
taosWUnLockLatch(&pMeta->lock);
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
}
return TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
return code;
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {

View File

@ -429,6 +429,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = 0;
{
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
taosArrayClear(pTask->msgInfo.pRetryList);
@ -440,7 +441,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfFailed = taosArrayGetSize(pList);
stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
id, pTask->info.selfChildId, numOfFailed, msgId);
for (int32_t i = 0; i < numOfFailed; i++) {
@ -471,6 +472,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet);
}
taosArrayDestroy(pList);
}
if (code != TSDB_CODE_SUCCESS) {
@ -1004,6 +1007,8 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
}

View File

@ -285,33 +285,31 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
stError(
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
"fill-history task",
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId);
id, (int32_t) pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 2. save to disk
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else {
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id,
pStreamTask->id.idStr);
}
ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL);
ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
@ -321,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else {
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
}
// wait for the stream task to handle all in the inputQ, and to be idle
@ -331,7 +329,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
char* p = NULL;
streamTaskGetStatus(pStreamTask, &p);
status = streamTaskGetStatus(pStreamTask, &p);
if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
@ -350,30 +354,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume.
// 3. resume the state of stream task, after this function, the stream task will run immediately.
streamTaskResume(pStreamTask);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
// 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info
// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
// 6. save to disk
taosWLockLatch(&pMeta->lock);
// 5. save to disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
// streamMetaSaveTask(pMeta, pStreamTask);
// if (streamMetaCommit(pMeta) < 0) {
// persist to disk
// }
taosWUnLockLatch(&pMeta->lock);
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
// 6. pause allowed.
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);

View File

@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
}
}
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
}
taosMsleep(100);
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
@ -447,20 +443,20 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
if (!streamTaskShouldStop(*ppTask)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
streamMetaRUnLock(pMeta);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask;
}
}
taosRUnLockLatch(&pMeta->lock);
streamMetaRUnLock(pMeta);
return NULL;
}
@ -491,7 +487,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
SStreamTask* pTask = NULL;
// pre-delete operation
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
@ -508,34 +504,35 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
while (1) {
taosRLockLatch(&pMeta->lock);
streamMetaRLock(pMeta);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
if ((*ppTask)->status.timerActive == 0) {
taosRUnLockLatch(&pMeta->lock);
streamMetaRUnLock(pMeta);
break;
}
taosMsleep(10);
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
taosRUnLockLatch(&pMeta->lock);
streamMetaRUnLock(pMeta);
} else {
taosRUnLockLatch(&pMeta->lock);
streamMetaRUnLock(pMeta);
break;
}
}
// let's do delete of stream task
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
@ -565,19 +562,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return 0;
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {
taosWLockLatch(&pMeta->lock);
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
taosWUnLockLatch(&pMeta->lock);
return -1;
}
taosWUnLockLatch(&pMeta->lock);
return 0;
streamMetaWLock(pMeta);
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
streamMetaWUnLock(pMeta);
return code;
}
// todo add error log
@ -846,6 +840,12 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
return false;
}
static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
taosArrayDestroy(pMsg->pTaskStatus);
taosArrayDestroy(pMsg->pUpdateNodes);
taosArrayDestroy(pIdList);
}
void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
@ -884,19 +884,33 @@ void metaHbToMnode(void* param, void* tmrId) {
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
SStreamHbMsg hbMsg = {0};
taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t stage = 0;
streamMetaRLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
hbMsg.vgId = pMeta->vgId;
stage = pMeta->stage;
SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL);
streamMetaRUnLock(pMeta);
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId* pId = taosArrayGet(pIdList, i);
streamMetaRLock(pMeta);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
streamMetaRUnLock(pMeta);
if (pTask == NULL) {
continue;
}
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) {
@ -906,12 +920,12 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskStatusEntry entry = {
.id = *pId,
.status = streamTaskGetStatus(*pTask, NULL),
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.nodeId = hbMsg.vgId,
.stage = stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
};
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
@ -930,7 +944,7 @@ void metaHbToMnode(void* param, void* tmrId) {
taosThreadMutexLock(&(*pTask)->lock);
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
for (int j = 0; j < num; ++j) {
int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
bool exist = false;
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
@ -957,7 +971,6 @@ void metaHbToMnode(void* param, void* tmrId) {
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock);
if (hasMnodeEpset) {
int32_t code = 0;
@ -966,17 +979,13 @@ void metaHbToMnode(void* param, void* tmrId) {
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
goto _end;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
goto _end;
}
SEncoder encoder;
@ -984,15 +993,12 @@ void metaHbToMnode(void* param, void* tmrId) {
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
goto _end;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
SRpcMsg msg = {.info.noResp = 1,};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
pMeta->pHbInfo->hbCount += 1;
@ -1003,16 +1009,15 @@ void metaHbToMnode(void* param, void* tmrId) {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
taosArrayDestroy(hbMsg.pTaskStatus);
taosArrayDestroy(hbMsg.pUpdateNodes);
_end:
clearHbMsg(&hbMsg, pIdList);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
void* pIter = NULL;
while (1) {
@ -1027,7 +1032,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
}
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return inTimer;
}
@ -1037,7 +1042,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
void* pIter = NULL;
while (1) {
@ -1051,7 +1056,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
streamTaskStop(pTask);
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
@ -1091,3 +1096,22 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
pStartInfo->startAllTasksFlag = 0;
pStartInfo->readyTs = 0;
}
void streamMetaRLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosRLockLatch(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock);
}
void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosWLockLatch(&pMeta->lock);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock);
}

View File

@ -580,13 +580,13 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskSetSchedStatusInactive(pTask);
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
streamMetaSaveTask(pMeta, pTask);
streamMetaCommit(pMeta);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
// history data scan in the stream time window finished, now let's enable the pause
streamTaskEnablePause(pTask);
@ -624,7 +624,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
if (ppTask) {
ASSERT((*ppTask)->status.timerActive >= 1);
@ -637,11 +638,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
taosMemoryFree(pInfo);
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return;
}
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
if (pTask != NULL) {
@ -934,66 +935,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
}
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
#if 0
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
const char* str = streamGetTaskStatusStr(status);
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
return;
}
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
return;
}
while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
return;
}
//
// if (pTask->status.downstreamReady == 0) {
// ASSERT(pTask->execInfo.start == 0);
// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr);
// break;
// }
const char* pStatus = streamGetTaskStatusStr(status);
stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
taosMsleep(100);
}
// todo: use the task lock, stead of meta lock
taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
taosWUnLockLatch(&pMeta->lock);
stDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
taosWUnLockLatch(&pMeta->lock);
#endif
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
@ -1029,19 +970,6 @@ void streamTaskResume(SStreamTask* pTask) {
}
}
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
// pre-condition check
// const char* id = pTask->id.idStr;
// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
// taosMsleep(100);
// }
//
// stDebug("s-task:%s disable task pause", id);
// pTask->status.pauseAllowed = 0;
}
void streamTaskEnablePause(SStreamTask* pTask) {
stDebug("s-task:%s enable task pause", pTask->id.idStr);
pTask->status.pauseAllowed = 1;
@ -1050,7 +978,7 @@ void streamTaskEnablePause(SStreamTask* pTask) {
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
taosWLockLatch(&pMeta->lock);
streamMetaWLock(pMeta);
STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
@ -1071,6 +999,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
pStartInfo->elapsedTime / 1000.0);
}
taosWUnLockLatch(&pMeta->lock);
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}

View File

@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
pState->streamBackendRid = pMeta->streamBackendRid;
// taosWLockLatch(&pMeta->lock);
// streamMetaWLock(pMeta);
taosThreadMutexLock(&pMeta->backendMutex);
void* uniqueId =
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);

View File

@ -20,6 +20,8 @@
#include "ttimer.h"
#include "wal.h"
#define GET_EVT_NAME(_ev) (StreamTaskEventList[(_ev)].name)
SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__READY, .name = "ready"},
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
@ -66,7 +68,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
return 0;
}
@ -80,13 +82,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
}
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
return 0;
@ -105,6 +100,39 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
return (state != TASK_STATUS__UNINIT);
}
if (event == TASK_EVENT_SCANHIST_DONE) {
return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY);
}
if (event == TASK_EVENT_GEN_CHECKPOINT) {
return (state != TASK_STATUS__READY);
}
if (event == TASK_EVENT_CHECKPOINT_DONE) {
return (state != TASK_STATUS__CK);
}
// todo refactor later
if (event == TASK_EVENT_RESUME) {
return true;
}
if (event == TASK_EVENT_HALT) {
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP ||
state == TASK_STATUS__SCAN_HISTORY) {
return true;
}
}
return false;
}
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
@ -115,10 +143,8 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
}
}
if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) {
} else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) {
// the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block.
if (isInvalidStateTransfer(state, event)) {
return NULL;
} else {
ASSERT(0);
}
@ -128,9 +154,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
void streamTaskRestoreStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
SStreamTaskState state = pSM->current;
@ -199,14 +226,14 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name);
stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
return TSDB_CODE_SUCCESS;
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name);
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
taosMsleep(100);
} else {
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
}
@ -227,37 +254,41 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = pSM->pTask;
STaskStateTrans* pTrans = NULL;
while (1) {
taosThreadMutexLock(&pTask->lock);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
EStreamTaskEvent evt = pSM->pActiveTrans->event;
taosThreadMutexUnlock(&pTask->lock);
taosMsleep(100);
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name);
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
taosMsleep(100);
} else {
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name);
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pSM->pActiveTrans != NULL) {
// currently in some state transfer procedure, not auto invoke transfer, abort it
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name,
pSM->pActiveTrans->next.name, StreamTaskEventList[event].name);
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
}
doHandleEvent(pSM, event, pTrans);
code = doHandleEvent(pSM, event, pTrans);
break;
}
}
return TSDB_CODE_SUCCESS;
return code;
}
static void keepPrevInfo(SStreamTaskSM* pSM) {
@ -272,24 +303,27 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
// do update the task status
taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = pSM->pActiveTrans;
STaskStateTrans* pTrans = pSM->pActiveTrans;
if (pTrans == NULL) {
ETaskStatus s = pSM->current.state;
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP ||
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
// the pSM->prev.evt may be 0, so print string is not appropriate.
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name);
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
stDebug("s-task:%s unlockx", pTask->id.idStr);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pTrans->event != event) {
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name);
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
keepPrevInfo(pSM);
@ -303,14 +337,17 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList);
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
// OK, let's handle the attached event, since the task has reached the required status now
if (pSM->current.state == pEvtInfo->status) {
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr,
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
// remove it
taosArrayPop(pSM->pWaitingEventList);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
@ -325,13 +362,18 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
} else {
return code;
}
} else {
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
StreamTaskStatusList[pEvtInfo->status].name);
}
} else {
taosThreadMutexUnlock(&pTask->lock);
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
}
return TSDB_CODE_SUCCESS;
@ -440,6 +482,10 @@ void doInitStateTransferTable(void) {
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);

View File

@ -23,6 +23,8 @@
#include "syncReplication.h"
#include "syncUtil.h"
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
}
int dataLen = 0;
if (snapInfo.data) {
SSyncTLV *datHead = snapInfo.data;
void *pData = snapInfo.data;
int32_t type = 0;
if (pData) {
type = snapInfo.type;
SSyncTLV *datHead = pData;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
terrno = TSDB_CODE_INVALID_DATA_FMT;
@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
dataLen = sizeof(SSyncTLV) + datHead->len;
}
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (dataLen > 0) {
pMsg->payloadType = snapInfo.type;
memcpy(pMsg->data, snapInfo.data, dataLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
goto _out;
}
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
code = 0;
_out:
if (snapInfo.data) {
@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
int32_t code = -1;
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = seq;
if (pBlock != NULL && blockLen > 0) {
memcpy(pMsg->data, pBlock, blockLen);
}
pMsg->payloadType = typ;
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq);
goto _OUT;
}
code = 0;
_OUT:
return code;
}
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
goto _OUT;
}
@ -336,36 +330,17 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pBlk->seq;
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
goto _out;
}
pBlk->sendTimeMs = nowMs;
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
goto _out;
}
}
code = 0;
_out:;
taosThreadMutexUnlock(&pSndBuf->mutex);
@ -861,7 +836,7 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->ack = pMsg->seq;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
@ -893,13 +868,13 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
} else {
} else if (pMsg->seq < pRcvBuf->start) {
syncSnapSendRsp(pReceiver, pMsg, code);
goto _out;
}
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
if (pRcvBuf->entries[seq]) {
if (pRcvBuf->entries[seq % pRcvBuf->size]) {
pRcvBuf->cursor = seq;
} else {
break;

View File

@ -613,7 +613,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid function par
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid function para value")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION, "Not buildin function")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_DUP_TIMESTAMP, "Duplicate timestamps not allowed in function")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED, "Func to_timestamp failed, check log for detail")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR, "Func to_timestamp failed, format mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR, "Func to_timestamp failed, wrong timestamp")
//udf
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
@ -680,6 +681,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")

View File

@ -166,6 +166,44 @@ class TDTestCase:
def run(self):
self.prepareTestEnv()
self.test_to_timestamp()
self.test_ns_to_timestamp()
def create_tables(self):
tdSql.execute("create database if not exists test_us precision 'us'")
tdSql.execute("create database if not exists test_ns precision 'ns'")
tdSql.execute("use test_us")
tdSql.execute(f"CREATE STABLE `meters_us` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT, `ts2` timestamp) TAGS (`t1` INT)")
tdSql.execute(f"CREATE TABLE `ctb1_us` USING `meters_us` (`t1`) TAGS (1)")
tdSql.execute(f"CREATE TABLE `ctb2_us` USING `meters_us` (`t1`) TAGS (2)")
tdSql.execute("use test_ns")
tdSql.execute(f"CREATE STABLE `meters_ns` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT, `ts2` timestamp) TAGS (`t1` INT)")
tdSql.execute(f"CREATE TABLE `ctb1_ns` USING `meters_ns` (`t1`) TAGS (1)")
tdSql.execute(f"CREATE TABLE `ctb2_ns` USING `meters_ns` (`t1`) TAGS (2)")
def insert_ns_data(self):
tdLog.debug("start to insert data ............")
tdSql.execute(f"INSERT INTO `test_us`.`ctb1_us` VALUES ('2023-07-01 00:00:00.123456', 10.30000, 100, '2023-07-01 00:00:00.123456')")
tdSql.execute(f"INSERT INTO `test_us`.`ctb2_us` VALUES ('2023-08-01 00:00:00.123456', 20.30000, 200, '2023-07-01 00:00:00.123456')")
tdSql.execute(f"INSERT INTO `test_ns`.`ctb1_ns` VALUES ('2023-07-01 00:00:00.123456789', 10.30000, 100, '2023-07-01 00:00:00.123456000')")
tdSql.execute(f"INSERT INTO `test_ns`.`ctb2_ns` VALUES ('2023-08-01 00:00:00.123456789', 20.30000, 200, '2023-08-01 00:00:00.123456789')")
tdLog.debug("insert data ............ [OK]")
def test_ns_to_timestamp(self):
self.create_tables()
self.insert_ns_data()
tdSql.query("select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')", queryTimes=1)
tdSql.checkData(0, 0, 1690855810123)
tdSql.execute('use test_ns', queryTimes=1)
tdSql.query("select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')", queryTimes=1)
tdSql.checkData(0, 0, 1690855810123)
tdSql.query("select to_char(ts2, 'yyyy-mm-dd hh:mi:ss.ns') from meters_ns", queryTimes=1)
tdSql.checkData(0, 0, '2023-07-01 12:00:00.123456000')
tdSql.checkData(1, 0, '2023-08-01 12:00:00.123456789')
tdSql.query("select to_timestamp(to_char(ts2, 'yyyy-mm-dd hh:mi:ss.ns'), 'yyyy-mm-dd hh:mi:ss.ns') from meters_ns", queryTimes=1)
tdSql.checkData(0, 0, 1688140800123456000)
tdSql.checkData(1, 0, 1690819200123456789)
def stop(self):
tdSql.close()