This commit is contained in:
yihaoDeng 2024-04-29 17:00:17 +08:00
commit b71e559cf4
27 changed files with 268 additions and 125 deletions

View File

@ -25,7 +25,7 @@ create_definition:
col_name column_definition col_name column_definition
column_definition: column_definition:
type_name [comment 'string_value'] [PRIMARY KEY] type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type']
table_options: table_options:
table_option ... table_option ...
@ -51,6 +51,7 @@ table_option: {
Only ASCII visible characters can be used with escape character. Only ASCII visible characters can be used with escape character.
**Parameter description** **Parameter description**
1. COMMENT: specifies comments for the table. This parameter can be used with supertables, standard tables, and subtables. 1. COMMENT: specifies comments for the table. This parameter can be used with supertables, standard tables, and subtables.
2. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables. 2. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables.
3. TTL: specifies the time to live (TTL) for the table. If TTL is specified when creatinga table, after the time period for which the table has been existing is over TTL, TDengine will automatically delete the table. Please be noted that the system may not delete the table at the exact moment that the TTL expires but guarantee there is such a system and finally the table will be deleted. The unit of TTL is in days. The default value is 0, i.e. never expire. 3. TTL: specifies the time to live (TTL) for the table. If TTL is specified when creatinga table, after the time period for which the table has been existing is over TTL, TDengine will automatically delete the table. Please be noted that the system may not delete the table at the exact moment that the TTL expires but guarantee there is such a system and finally the table will be deleted. The unit of TTL is in days. The default value is 0, i.e. never expire.
@ -104,6 +105,7 @@ alter_table_option: {
**More explanations** **More explanations**
You can perform the following modifications on existing tables: You can perform the following modifications on existing tables:
1. ADD COLUMN: adds a column to the supertable. 1. ADD COLUMN: adds a column to the supertable.
2. DROP COLUMN: deletes a column from the supertable. 2. DROP COLUMN: deletes a column from the supertable.
3. MODIFY COLUMN: changes the length of the data type specified for the column. Note that you can only specify a length greater than the current length. 3. MODIFY COLUMN: changes the length of the data type specified for the column. Note that you can only specify a length greater than the current length.
@ -154,6 +156,7 @@ alter_table_option: {
``` ```
**More explanations** **More explanations**
1. Only the value of a tag can be modified directly. For all other modifications, you must modify the supertable from which the subtable was created. 1. Only the value of a tag can be modified directly. For all other modifications, you must modify the supertable from which the subtable was created.
### Change Tag Value Of Sub Table ### Change Tag Value Of Sub Table

View File

@ -23,7 +23,10 @@ create_subtable_clause: {
} }
create_definition: create_definition:
col_name column_type [PRIMARY KEY] col_name column_definition
column_definition:
type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type']
table_options: table_options:
table_option ... table_option ...

View File

@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
void s3EvictCache(const char *path, long object_size); void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name); long s3Size(const char *object_name);
int32_t s3GetObjectToFile(const char *object_name, char *fileName); int32_t s3GetObjectToFile(const char *object_name, const char *fileName);
#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024) #define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024)

View File

@ -56,6 +56,7 @@ extern "C" {
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
@ -443,6 +444,7 @@ typedef struct SDownstreamStatusInfo {
typedef struct STaskCheckInfo { typedef struct STaskCheckInfo {
SArray* pList; SArray* pList;
int64_t startTs; int64_t startTs;
int64_t timeoutStartTs;
int32_t notReadyTasks; int32_t notReadyTasks;
int32_t inCheckProcess; int32_t inCheckProcess;
int32_t stopCheckProcess; int32_t stopCheckProcess;
@ -547,7 +549,7 @@ typedef struct SStreamMeta {
SArray* chkpSaved; SArray* chkpSaved;
SArray* chkpInUse; SArray* chkpInUse;
SRWLatch chkpDirLock; SRWLatch chkpDirLock;
void* qHandle; void* qHandle; // todo remove it
void* bkdChkptMgt; void* bkdChkptMgt;
} SStreamMeta; } SStreamMeta;
@ -885,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready); int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta);

View File

@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType); typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef bool (*RpcNoDelayfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle); typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit { typedef struct SRpcInit {
@ -118,6 +119,8 @@ typedef struct SRpcInit {
// fail fast fp // fail fast fp
RpcFFfp ffp; RpcFFfp ffp;
RpcNoDelayfp noDelayFp;
int32_t connLimitNum; int32_t connLimitNum;
int32_t connLimitLock; int32_t connLimitLock;
int32_t timeToGetConn; int32_t timeToGetConn;

View File

@ -77,6 +77,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)

View File

@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
} }
int32_t s3GetObjectToFile(const char *object_name, char *fileName) { int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
const char *ifMatch = 0, *ifNotMatch = 0; const char *ifMatch = 0, *ifNotMatch = 0;
@ -1733,6 +1733,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
void s3EvictCache(const char *path, long object_size) {} void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; } long s3Size(const char *object_name) { return 0; }
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; } int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; }
#endif #endif

View File

@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
@ -193,6 +194,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -60,7 +60,7 @@ int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60; int32_t tsKeepAliveIdle = 60;
int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfTaskQueueThreads = 10;
int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
@ -552,12 +552,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = tsNumOfCores;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 10);
if (tsNumOfTaskQueueThreads >= 50) {
tsNumOfTaskQueueThreads = 50;
}
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0)
return -1; return -1;
if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;

View File

@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) {
return false; return false;
} }
} }
static bool rpcNoDelayMsg(tmsg_t msgType) {
if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
return true;
}
return false;
}
int32_t dmInitClient(SDnode *pDnode) { int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp; rpcInit.ffp = dmFailFastFp;
rpcInit.noDelayFp = rpcNoDelayMsg;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);

View File

@ -2340,11 +2340,6 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
} }
_OVER: _OVER:
if (code != 0) {
mError("failed to get table tsma %s since %s fetching with tsma name %d", tsmaReq.name, terrstr(),
tsmaReq.fetchingWithTsmaName);
}
tFreeTableTSMAInfoRsp(&rsp); tFreeTableTSMAInfoRsp(&rsp);
return code; return code;
} }

View File

@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
// checkpoint backup type
char backup[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(backup, "none")
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
// history scan idle
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a"); strcpy(scanHistoryIdle, "100a");
@ -1644,10 +1651,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
// checkpoint info // checkpoint version
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
// checkpoint backup status
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, 0, true);
// ds_err_info // ds_err_info
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, 0, true); colDataSetVal(pColInfo, numOfRows, 0, true);

View File

@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry {
int32_t transId; int32_t transId;
} STaskUpdateEntry; } STaskUpdateEntry;
typedef struct SMStreamCheckpointReadyRspMsg {
SMsgHead head;
} SMStreamCheckpointReadyRspMsg;
static STaskId replaceStreamTaskId(SStreamTask* pTask) { static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory); ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
int64_t initTs = 0;
int64_t now = taosGetTimestampMs();
STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId};
STaskId fId = {0};
bool hasHistoryTask = false;
// todo extract method
if (!isLeader) { if (!isLeader) {
// this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map.
streamMetaRLock(pMeta);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
} else { return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
streamMetaRUnLock(pMeta);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
return code;
} }
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
streamMetaRLock(pMeta); return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
// let's try to find this task in hashmap
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
} else { // not exist even in the hash map of meta, forget it
streamMetaRUnLock(pMeta);
}
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return code;
} }
code = streamProcessCheckRsp(pTask, &rsp); code = streamProcessCheckRsp(pTask, &rsp);
@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
return code; return code;
} }
typedef struct SMStreamCheckpointReadyRspMsg {
SMsgHead head;
} SMStreamCheckpointReadyRspMsg;
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
streamMetaStopAllTasks(pMeta); streamMetaStopAllTasks(pMeta);
return 0; return 0;
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);

View File

@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
void streamTaskSetCheckpointFailedId(SStreamTask* pTask); void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskGetTaskId(const SStreamTask* pTask); STaskId streamTaskGetTaskId(const SStreamTask* pTask);
@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t streamTaskBackupCheckpoint(char* id, char* path);
int32_t downloadCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path);
int32_t deleteCheckpoint(char* id); int32_t deleteCheckpoint(char* id);
int32_t deleteCheckpointFile(char* id, char* name);
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -175,7 +175,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) { if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
@ -194,7 +194,7 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
pInfo->stopCheckProcess = 1; pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check rsp mon", id); stDebug("s-task:%s set stop check-rsp monit", id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -272,6 +272,8 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
} }
pInfo->startTs = startTs; pInfo->startTs = startTs;
pInfo->timeoutStartTs = startTs;
pInfo->stopCheckProcess = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -293,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) { if (p != NULL) {
if (reqId != p->reqId) { if (reqId != p->reqId) {
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId); id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -329,7 +331,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
stDebug("s-task:%s set the in-check-procedure flag", id); stDebug("s-task:%s set the in check-rsp flag", id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -343,9 +345,10 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
} }
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0; pInfo->startTs = 0;
pInfo->timeoutStartTs = 0;
pInfo->notReadyTasks = 0; pInfo->notReadyTasks = 0;
pInfo->inCheckProcess = 0; pInfo->inCheckProcess = 0;
pInfo->stopCheckProcess = 0; pInfo->stopCheckProcess = 0;
@ -458,6 +461,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
pInfo->timeoutStartTs = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTimeout; ++i) { for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
@ -488,7 +492,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
} else { } else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
} }
} }
@ -517,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
} }
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
// of restart in timer thread will result in a dead lock.
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
return -1;
}
stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
return 0;
}
// this function is executed in timer thread
void rspMonitorFn(void* param, void* tmrId) { void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
@ -524,7 +552,7 @@ void rspMonitorFn(void* param, void* tmrId) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo; STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs; int64_t timeoutDuration = now - pInfo->timeoutStartTs;
ETaskStatus state = pStat->state; ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t numOfReady = 0; int32_t numOfReady = 0;
@ -541,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id); streamTaskCompleteCheckRsp(pInfo, true, id);
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return; return;
@ -577,7 +600,7 @@ void rspMonitorFn(void* param, void* tmrId) {
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) { if (pStat->state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
} else { // unexpected status } else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
} }
@ -614,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) {
streamTaskCompleteCheckRsp(pInfo, false, id); streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
@ -639,8 +656,10 @@ void rspMonitorFn(void* param, void* tmrId) {
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", stDebug(
id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
"ready:%d",
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);

View File

@ -26,6 +26,9 @@ typedef struct {
SStreamTask* pTask; SStreamTask* pTask;
} SAsyncUploadArg; } SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(char* id, char* name);
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -376,21 +379,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
return code; return code;
} }
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
} }
int32_t getChkpMeta(char* id, char* path, SArray* list) { static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
char* file = taosMemoryCalloc(1, strlen(path) + 32); char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
int32_t code = downloadCheckpointByName(id, "META", file);
int32_t code = downloadCheckpointDataByName(id, "META", file);
if (code != 0) { if (code != 0) {
stDebug("chkp failed to download meta file:%s", file); stDebug("chkp failed to download meta file:%s", file);
taosMemoryFree(file); taosMemoryFree(file);
return code; return code;
} }
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0}; char buf[128] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
@ -427,7 +432,7 @@ int32_t uploadCheckpointData(void* param) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
} }
if (arg->type == DATA_UPLOAD_S3) { if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
} }
} }
@ -457,8 +462,7 @@ int32_t uploadCheckpointData(void* param) {
return code; return code;
} }
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) { if (type == DATA_UPLOAD_DISABLE) {
return 0; return 0;
@ -514,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId); code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadChkp(pTask, ckId, (char*)id); code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
} }
@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamTaskSetCheckpointFailedId(pTask); streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
} }
@ -585,12 +589,12 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
stDebug("[s3] upload checkpoint:%s", filename); stDebug("[s3] upload checkpoint:%s", filename);
// break; // break;
} }
taosCloseDir(&pDir);
taosCloseDir(&pDir);
return 0; return 0;
} }
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t code = 0; int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname); sprintf(buf, "%s/%s", id, fname);
@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) {
} }
// fileName: CURRENT // fileName: CURRENT
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid"); stError("uploadCheckpointByName parameters invalid");
return -1; return -1;
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return 0; return 0;
} else if (tsS3StreamEnabled) { } else if (tsS3StreamEnabled) {
return downloadCheckpointByNameS3(id, fname, dstName); return downloadCheckpointByNameS3(id, fname, dstName);
} }
return 0; return 0;
} }
@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) {
stError("downloadCheckpoint parameters invalid"); stError("downloadCheckpoint parameters invalid");
return -1; return -1;
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return downloadRsync(id, path); return downloadRsync(id, path);
} else if (tsS3StreamEnabled) { } else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path); return s3GetObjectsByPrefix(id, path);
} }
return 0; return 0;
} }

View File

@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec. * appropriate batch of blocks should be handled in 5 to 10 sec.
*/ */
int32_t doStreamExecTask(SStreamTask* pTask) { static int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.

View File

@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask); streamTaskSetFailedCheckpointId(pTask);
} else { } else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
} }
@ -1707,3 +1707,38 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
return 0; return 0;
} }
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
return code;
}

View File

@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask); streamTaskSetFailedCheckpointId(pTask);
} }
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
} }
@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state; ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask); streamTaskSetFailedCheckpointId(pTask);
} }
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);

View File

@ -63,6 +63,7 @@ typedef struct {
bool (*startTimer)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType); bool (*failFastFp)(tmsg_t msgType);
bool (*noDelayFp)(tmsg_t msgType);
int32_t connLimitNum; int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock int8_t connLimitLock; // 0: no lock. 1. lock

View File

@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp; pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp; pRpc->failFastFp = pInit->ffp;
pRpc->noDelayFp = pInit->noDelayFp;
pRpc->connLimitNum = pInit->connLimitNum; pRpc->connLimitNum = pInit->connLimitNum;
if (pRpc->connLimitNum == 0) { if (pRpc->connLimitNum == 0) {
pRpc->connLimitNum = 20; pRpc->connLimitNum = 20;

View File

@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status); static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code);
// handle req from app // handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) {
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL; pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd); doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
} }
taosMemoryFree(msglist); taosMemoryFree(msglist);
@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SMsgList* list = plist->list; SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) { if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId; STraceId* trace = &(*pMsg)->msg.info.traceId;
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) {
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
*pMsg = NULL;
return NULL;
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg; arg->param1 = *pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL; *pMsg = NULL;
} else { } else {
@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
} }
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.contLen = 0; transMsg.contLen = 0;
transMsg.pCont = NULL; transMsg.pCont = NULL;
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; transMsg.code = code;
transMsg.msgType = pMsg->msg.msgType + 1; transMsg.msgType = pMsg->msg.msgType + 1;
transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.traceId = pMsg->msg.info.traceId;
@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) {
SCliMsg* pMsg = arg->param1; SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2; SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int32_t code = TSDB_CODE_RPC_MAX_SESSIONS;
QUEUE_REMOVE(&pMsg->q); QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd); doNotifyApp(pMsg, pThrd, code);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {

View File

@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")

View File

@ -381,6 +381,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td29793.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py

View File

@ -51,6 +51,8 @@ class TDSql:
def init(self, cursor, log=True): def init(self, cursor, log=True):
self.cursor = cursor self.cursor = cursor
self.sql = None
print(f"sqllog is :{log}") print(f"sqllog is :{log}")
if (log): if (log):
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])

View File

@ -222,7 +222,7 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult)) tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(251, 252)) tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult)) tdSql.checkEqual(54, len(tdSql.queryResult))

View File

@ -0,0 +1,88 @@
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.csv import *
import os
import taos
import json
from taos import SmlProtocol, SmlPrecision
from taos.error import SchemalessError
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), True)
def run(self):
conn = taos.connect()
conn.execute("drop database if exists reproduce")
conn.execute("CREATE DATABASE reproduce")
conn.execute("USE reproduce")
# influxDB
conn.execute("drop table if exists meters")
lines1 = ["meters,location=California.LosAngeles groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249000",]
lines2 = ["meters,location=California.LosAngeles,groupid=2 groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249001",]
lines3 = ["meters,location=California.LosAngeles,groupid=2 current=11i32,voltage=221,phase=0.28 1648432611249002",]
try:
conn.schemaless_insert(lines1, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
conn.schemaless_insert(lines2, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
try:
conn.schemaless_insert(lines3, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
# OpenTSDB
conn.execute("drop table if exists meters")
lines1 = ["meters 1648432611249 10i32 location=California.SanFrancisco groupid=2 groupid=3",]
lines2 = ["meters 1648432611250 10i32 groupid=2 location=California.SanFrancisco groupid=3",]
try:
conn.schemaless_insert(lines1, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
try:
conn.schemaless_insert(lines2, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
# OpenTSDB Json
conn.execute("drop table if exists meters")
lines1 = [{"metric": "meters", "timestamp": 1648432611249, "value": "a32", "tags": {"location": "California.SanFrancisco", "groupid": 2, "groupid": 3}}]
lines2 = [{"metric": "meters", "timestamp": 1648432611250, "value": "a32", "tags": {"groupid": 2, "location": "California.SanFrancisco", "groupid": 4}}]
try:
lines = json.dumps(lines1)
conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
# tdSql.checkEqual('expected error', 'no error occurred') TD-29850
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
try:
lines = json.dumps(lines2)
conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
# tdSql.checkEqual('expected error', 'no error occurred') TD-29850
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())