From ac15015cb8e0ffa50db4b2b118767529407b6b9a Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Fri, 26 Apr 2024 08:10:39 +0000 Subject: [PATCH 01/20] Add RpcNoDelayfp function to handle specific message types --- include/libs/transport/trpc.h | 3 +++ include/util/taoserror.h | 1 + source/dnode/mgmt/node_mgmt/src/dmTransport.c | 11 ++++++++- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 23 ++++++++++++------- source/util/src/terror.c | 1 + 7 files changed, 32 insertions(+), 9 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 460b8962ea..95f70c8ff3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcFFfp)(tmsg_t msgType); +typedef bool (*RpcNoDelayfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -118,6 +119,8 @@ typedef struct SRpcInit { // fail fast fp RpcFFfp ffp; + RpcNoDelayfp noDelayFp; + int32_t connLimitNum; int32_t connLimitLock; int32_t timeToGetConn; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 916de6e715..03a024bb8c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -77,6 +77,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) +#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 754c42b82e..a2355ddd22 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) { return false; } } - +static bool rpcNoDelayMsg(tmsg_t msgType) { + if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE || + msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) { + return true; + } + return false; +} int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; + rpcInit.noDelayFp = rpcNoDelayMsg; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); @@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->clientRpc = rpcOpen(&rpcInit); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index cc2c0d4e84..7853e25cff 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,6 +63,7 @@ typedef struct { bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); + bool (*noDelayFp)(tmsg_t msgType); int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f658947144..5ed2e00acd 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; + pRpc->noDelayFp = pInit->noDelayFp; pRpc->connLimitNum = pInit->connLimitNum; if (pRpc->connLimitNum == 0) { pRpc->connLimitNum = 20; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4da1f04cd9..dfd7630f35 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) { transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); pMsg->ctx->task = NULL; - doNotifyApp(pMsg, pThrd); + doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); } taosMemoryFree(msglist); @@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; + if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) { + tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), + tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + *pMsg = NULL; + return NULL; + } + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = *pMsg; arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; } else { @@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) { } } -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; STransMsg transMsg = {0}; transMsg.contLen = 0; transMsg.pCont = NULL; - transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; + transMsg.code = code; transMsg.msgType = pMsg->msg.msgType + 1; transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; @@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; STrans* pTransInst = pThrd->pTransInst; - + int32_t code = TSDB_CODE_RPC_MAX_SESSIONS; QUEUE_REMOVE(&pMsg->q); STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyApp(pMsg, pThrd); + doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ab5d3da781..3ef656b2b4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") From c177bfb60bae091a7f70261afe25d6e14a23f13a Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 01:10:50 +0000 Subject: [PATCH 02/20] Add configurable storage compression documentation --- docs/en/12-taos-sql/03-table.md | 5 ++++- docs/zh/12-taos-sql/03-table.md | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/03-table.md b/docs/en/12-taos-sql/03-table.md index a10abd28a5..fca953584e 100644 --- a/docs/en/12-taos-sql/03-table.md +++ b/docs/en/12-taos-sql/03-table.md @@ -22,7 +22,7 @@ create_subtable_clause: { } create_definition: - col_name column_definition + col_name column_type [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] column_definition: type_name [comment 'string_value'] @@ -50,6 +50,7 @@ table_option: { Only ASCII visible characters can be used with escape character. **Parameter description** + 1. COMMENT: specifies comments for the table. This parameter can be used with supertables, standard tables, and subtables. 2. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables. 3. TTL: specifies the time to live (TTL) for the table. If TTL is specified when creatinga table, after the time period for which the table has been existing is over TTL, TDengine will automatically delete the table. Please be noted that the system may not delete the table at the exact moment that the TTL expires but guarantee there is such a system and finally the table will be deleted. The unit of TTL is in days. The default value is 0, i.e. never expire. @@ -103,6 +104,7 @@ alter_table_option: { **More explanations** You can perform the following modifications on existing tables: + 1. ADD COLUMN: adds a column to the supertable. 2. DROP COLUMN: deletes a column from the supertable. 3. MODIFY COLUMN: changes the length of the data type specified for the column. Note that you can only specify a length greater than the current length. @@ -152,6 +154,7 @@ alter_table_option: { ``` **More explanations** + 1. Only the value of a tag can be modified directly. For all other modifications, you must modify the supertable from which the subtable was created. ### Change Tag Value Of Sub Table diff --git a/docs/zh/12-taos-sql/03-table.md b/docs/zh/12-taos-sql/03-table.md index 7e20f20574..0205145904 100644 --- a/docs/zh/12-taos-sql/03-table.md +++ b/docs/zh/12-taos-sql/03-table.md @@ -23,7 +23,7 @@ create_subtable_clause: { } create_definition: - col_name column_type + col_name column_type [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... From 8adaeb59a3312882bf77be921571bc2345477e45 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 09:50:21 +0800 Subject: [PATCH 03/20] refactor: do some internal refactor. --- include/common/cos.h | 2 +- source/common/src/cos.c | 2 +- source/libs/stream/inc/streamInt.h | 4 ++-- source/libs/stream/src/streamCheckpoint.c | 22 ++++++++++++++------- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamStartHistory.c | 4 ++-- 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/include/common/cos.h b/include/common/cos.h index 8e48533304..17c48d594b 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); -int32_t s3GetObjectToFile(const char *object_name, char *fileName); +int32_t s3GetObjectToFile(const char *object_name, const char *fileName); #define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 990bfdcea3..8ad5fb36b5 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 07dce9a451..45a75ea5e7 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); -void streamTaskSetCheckpointFailedId(SStreamTask* pTask); +void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskGetTaskId(const SStreamTask* pTask); @@ -161,7 +161,7 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path); int32_t deleteCheckpoint(char* id); int32_t deleteCheckpointFile(char* id, char* name); -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); +//int32_t downloadCheckpointDataByName(const char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e6d7c2fde8..8244df2995 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -26,6 +26,8 @@ typedef struct { SStreamTask* pTask; } SAsyncUploadArg; +static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); + int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -376,21 +378,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { return code; } -void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { +void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); } -int32_t getChkpMeta(char* id, char* path, SArray* list) { +static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { char* file = taosMemoryCalloc(1, strlen(path) + 32); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); - int32_t code = downloadCheckpointByName(id, "META", file); + + int32_t code = downloadCheckpointDataByName(id, "META", file); if (code != 0) { stDebug("chkp failed to download meta file:%s", file); taosMemoryFree(file); return code; } + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); char buf[128] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { @@ -427,7 +431,7 @@ int32_t uploadCheckpointData(void* param) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { + if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); } } @@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } @@ -590,7 +594,7 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { return 0; } -static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { +static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); @@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) { } // fileName: CURRENT -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return 0; } else if (tsS3StreamEnabled) { return downloadCheckpointByNameS3(id, fname, dstName); } + return 0; } @@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) { stError("downloadCheckpoint parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return downloadRsync(id, path); } else if (tsS3StreamEnabled) { return s3GetObjectsByPrefix(id, path); } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a464594233..03f8d2adfd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } else { stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index c76536aedf..b3df5755ea 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); } @@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); From b2d8260f14e5d7a50249b59f27d0ed83c4ae0882 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 10:05:22 +0800 Subject: [PATCH 04/20] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 7 +++++++ source/libs/stream/inc/streamInt.h | 2 -- source/libs/stream/src/streamCheckpoint.c | 8 ++++---- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c74a9fd7b..119a77cbe8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -547,7 +547,7 @@ typedef struct SStreamMeta { SArray* chkpSaved; SArray* chkpInUse; SRWLatch chkpDirLock; - void* qHandle; + void* qHandle; // todo remove it void* bkdChkptMgt; } SStreamMeta; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 14e8088dfe..7c868fcbe4 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 844aae0f57..f6d86ad317 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint backup type + char backup[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(backup, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)backup, false); + + // history scan idle char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; strcpy(scanHistoryIdle, "100a"); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 45a75ea5e7..0ee31197dc 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path); int32_t deleteCheckpoint(char* id); -int32_t deleteCheckpointFile(char* id, char* name); -//int32_t downloadCheckpointDataByName(const char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8244df2995..3428fc36e1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -27,6 +27,7 @@ typedef struct { } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); +static int32_t deleteCheckpointFile(char* id, char* name); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -461,8 +462,7 @@ int32_t uploadCheckpointData(void* param) { return code; } -int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { - // async upload +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -518,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadChkp(pTask, ckId, (char*)id); + code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } @@ -589,8 +589,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { stDebug("[s3] upload checkpoint:%s", filename); // break; } - taosCloseDir(&pDir); + taosCloseDir(&pDir); return 0; } From ed962186a11cce02dd9c562cfc3370912e50b19a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 10:22:09 +0800 Subject: [PATCH 05/20] enh(stream): add attrs for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 7c868fcbe4..bf2f14339d 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -194,6 +194,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f6d86ad317..4e1adcc366 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1651,10 +1651,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); - // checkpoint info + // checkpoint version pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + // checkpoint backup status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + // ds_err_info pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, 0, true); From 2d1c07546b988132a376001b260f9d75b9d423e2 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 02:52:37 +0000 Subject: [PATCH 06/20] Add configurable storage compression documentation --- docs/en/12-taos-sql/03-table.md | 2 +- docs/zh/12-taos-sql/03-table.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/03-table.md b/docs/en/12-taos-sql/03-table.md index fca953584e..1419cddcf8 100644 --- a/docs/en/12-taos-sql/03-table.md +++ b/docs/en/12-taos-sql/03-table.md @@ -22,7 +22,7 @@ create_subtable_clause: { } create_definition: - col_name column_type [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] + col_name column_type [PRIMARY_KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] column_definition: type_name [comment 'string_value'] diff --git a/docs/zh/12-taos-sql/03-table.md b/docs/zh/12-taos-sql/03-table.md index 0205145904..7514b14fcd 100644 --- a/docs/zh/12-taos-sql/03-table.md +++ b/docs/zh/12-taos-sql/03-table.md @@ -23,7 +23,7 @@ create_subtable_clause: { } create_definition: - col_name column_type [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] + col_name column_type [PRIMARY_KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... From 23599809d79a15b5696b5fe3ddcb58bc9c4eec05 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 03:08:13 +0000 Subject: [PATCH 07/20] Add configurable storage compression documentation --- docs/en/12-taos-sql/03-table.md | 2 +- docs/zh/12-taos-sql/03-table.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/03-table.md b/docs/en/12-taos-sql/03-table.md index 1419cddcf8..c35c7efa89 100644 --- a/docs/en/12-taos-sql/03-table.md +++ b/docs/en/12-taos-sql/03-table.md @@ -22,7 +22,7 @@ create_subtable_clause: { } create_definition: - col_name column_type [PRIMARY_KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] + col_name column_type [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] column_definition: type_name [comment 'string_value'] diff --git a/docs/zh/12-taos-sql/03-table.md b/docs/zh/12-taos-sql/03-table.md index 7514b14fcd..71500a78e4 100644 --- a/docs/zh/12-taos-sql/03-table.md +++ b/docs/zh/12-taos-sql/03-table.md @@ -23,7 +23,7 @@ create_subtable_clause: { } create_definition: - col_name column_type [PRIMARY_KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] + col_name column_type [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... From 6ea4823f1e87f6b8397865c74ae61ab2520976a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 11:14:10 +0800 Subject: [PATCH 08/20] fix(stream): update the timeout measurement. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckStatus.c | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 119a77cbe8..9b7a433c18 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -443,6 +443,7 @@ typedef struct SDownstreamStatusInfo { typedef struct STaskCheckInfo { SArray* pList; int64_t startTs; + int64_t timeoutStartTs; int32_t notReadyTasks; int32_t inCheckProcess; int32_t stopCheckProcess; diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 0a87833055..152f890cc6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -272,6 +272,7 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; + pInfo->timeoutStartTs = startTs; return TSDB_CODE_SUCCESS; } @@ -346,6 +347,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; + pInfo->timeoutStartTs = 0; pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; pInfo->stopCheckProcess = 0; @@ -458,6 +460,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); ASSERT(pTask->status.downstreamReady == 0); + pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); @@ -488,7 +491,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); } else { stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs); } } @@ -524,7 +527,7 @@ void rspMonitorFn(void* param, void* tmrId) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); - int64_t el = now - pInfo->startTs; + int64_t timeoutDuration = now - pInfo->timeoutStartTs; ETaskStatus state = pStat->state; const char* id = pTask->id.idStr; int32_t numOfReady = 0; @@ -577,7 +580,7 @@ void rspMonitorFn(void* param, void* tmrId) { SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (pStat->state == TASK_STATUS__UNINIT) { - getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } @@ -639,8 +642,10 @@ void rspMonitorFn(void* param, void* tmrId) { taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", - id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug( + "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, " + "ready:%d", + id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); From 88cc43250f62a4865a11d4f608dc5ff03fe29887 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Sun, 28 Apr 2024 11:32:17 +0800 Subject: [PATCH 09/20] use debug log when fetching no tsmas for table --- source/dnode/mnode/impl/src/mndSma.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 02c932289f..d7b0b2d09d 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -2340,11 +2340,6 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) { } _OVER: - if (code != 0) { - mError("failed to get table tsma %s since %s fetching with tsma name %d", tsmaReq.name, terrstr(), - tsmaReq.fetchingWithTsmaName); - } - tFreeTableTSMAInfoRsp(&rsp); return code; } From 5a3755e892e5e36929e6c616309b9f71a1ae9616 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 13:30:46 +0800 Subject: [PATCH 10/20] fix(test): fix test cases. --- tests/system-test/0-others/information_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 56ef8c6b47..137fe82178 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(251, 252)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(252, 253)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) From 57ee97814fabca2b698e1795d244e0257469ba8e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 14:29:18 +0800 Subject: [PATCH 11/20] fix(stream): fix failed to launch timer bug. --- source/libs/stream/src/streamCheckStatus.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 152f890cc6..eee1332821 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -175,7 +175,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); + stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref); if (pInfo->checkRspTmr == NULL) { pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); @@ -194,7 +194,7 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { pInfo->stopCheckProcess = 1; taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set stop check rsp mon", id); + stDebug("s-task:%s set stop check-rsp monit", id); return TSDB_CODE_SUCCESS; } @@ -273,6 +273,7 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut pInfo->startTs = startTs; pInfo->timeoutStartTs = startTs; + pInfo->stopCheckProcess = 0; return TSDB_CODE_SUCCESS; } @@ -330,7 +331,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { return TSDB_CODE_FAILED; } - stDebug("s-task:%s set the in-check-procedure flag", id); + stDebug("s-task:%s set the in check-rsp flag", id); return TSDB_CODE_SUCCESS; } @@ -344,7 +345,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* } int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; - stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; pInfo->timeoutStartTs = 0; From 27a10c944a822b9b7f7911cbe6916c20914810de Mon Sep 17 00:00:00 2001 From: Chris Zhai Date: Sun, 28 Apr 2024 15:27:57 +0800 Subject: [PATCH 12/20] add test scripts for td29793 --- tests/pytest/util/sql.py | 2 + tests/system-test/1-insert/test_td29793.py | 88 ++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 tests/system-test/1-insert/test_td29793.py diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index b46326bb3c..00171a19a6 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -51,6 +51,8 @@ class TDSql: def init(self, cursor, log=True): self.cursor = cursor + self.sql = None + print(f"sqllog is :{log}") if (log): caller = inspect.getframeinfo(inspect.stack()[1][0]) diff --git a/tests/system-test/1-insert/test_td29793.py b/tests/system-test/1-insert/test_td29793.py new file mode 100644 index 0000000000..cdcaa244bb --- /dev/null +++ b/tests/system-test/1-insert/test_td29793.py @@ -0,0 +1,88 @@ +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.csv import * +import os +import taos +import json +from taos import SmlProtocol, SmlPrecision +from taos.error import SchemalessError + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), True) + + + def run(self): + conn = taos.connect() + + conn.execute("drop database if exists reproduce") + conn.execute("CREATE DATABASE reproduce") + conn.execute("USE reproduce") + + # influxDB + conn.execute("drop table if exists meters") + lines1 = ["meters,location=California.LosAngeles groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249000",] + lines2 = ["meters,location=California.LosAngeles,groupid=2 groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249001",] + lines3 = ["meters,location=California.LosAngeles,groupid=2 current=11i32,voltage=221,phase=0.28 1648432611249002",] + + try: + conn.schemaless_insert(lines1, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS) + conn.schemaless_insert(lines2, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + try: + conn.schemaless_insert(lines3, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + + # OpenTSDB + conn.execute("drop table if exists meters") + lines1 = ["meters 1648432611249 10i32 location=California.SanFrancisco groupid=2 groupid=3",] + lines2 = ["meters 1648432611250 10i32 groupid=2 location=California.SanFrancisco groupid=3",] + + try: + conn.schemaless_insert(lines1, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + try: + conn.schemaless_insert(lines2, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + # OpenTSDB Json + conn.execute("drop table if exists meters") + lines1 = [{"metric": "meters", "timestamp": 1648432611249, "value": "a32", "tags": {"location": "California.SanFrancisco", "groupid": 2, "groupid": 3}}] + lines2 = [{"metric": "meters", "timestamp": 1648432611250, "value": "a32", "tags": {"groupid": 2, "location": "California.SanFrancisco", "groupid": 4}}] + try: + lines = json.dumps(lines1) + conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + # tdSql.checkEqual('expected error', 'no error occurred') TD-29850 + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + try: + lines = json.dumps(lines2) + conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + # tdSql.checkEqual('expected error', 'no error occurred') TD-29850 + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 94f0dca425520991017e80f075f6551bf8361b2f Mon Sep 17 00:00:00 2001 From: Chris Zhai Date: Sun, 28 Apr 2024 15:52:18 +0800 Subject: [PATCH 13/20] add test_td29793 to cases.task --- tests/parallel_test/cases.task | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 3d1e8d2250..3fca381fda 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -381,6 +381,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td29793.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py From d65302393c635f1c8e97b9a6308f033a750093ed Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sun, 28 Apr 2024 16:08:30 +0800 Subject: [PATCH 14/20] fix: add default task queue thread number --- source/common/src/tglobal.c | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cad1145a6b..d80785a6f8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,7 +60,7 @@ int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; int32_t tsNumOfCommitThreads = 2; -int32_t tsNumOfTaskQueueThreads = 4; +int32_t tsNumOfTaskQueueThreads = 10; int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; @@ -552,12 +552,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1; - tsNumOfTaskQueueThreads = tsNumOfCores / 2; - tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); + tsNumOfTaskQueueThreads = tsNumOfCores; + tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 10); - if (tsNumOfTaskQueueThreads >= 50) { - tsNumOfTaskQueueThreads = 50; - } if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; From 037b394bd64261bd599201dca47dab487b7771da Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 08:09:44 +0000 Subject: [PATCH 15/20] Refactor table creation code --- docs/en/12-taos-sql/03-table.md | 4 ++-- docs/zh/12-taos-sql/03-table.md | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/en/12-taos-sql/03-table.md b/docs/en/12-taos-sql/03-table.md index c94e536cf6..ca22a6ace7 100644 --- a/docs/en/12-taos-sql/03-table.md +++ b/docs/en/12-taos-sql/03-table.md @@ -22,10 +22,10 @@ create_subtable_clause: { } create_definition: - col_name column_type [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] + col_name column_definition column_definition: - type_name [comment 'string_value'] [PRIMARY KEY] + type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... diff --git a/docs/zh/12-taos-sql/03-table.md b/docs/zh/12-taos-sql/03-table.md index a6df940133..773ce75430 100644 --- a/docs/zh/12-taos-sql/03-table.md +++ b/docs/zh/12-taos-sql/03-table.md @@ -23,7 +23,10 @@ create_subtable_clause: { } create_definition: - col_name column_type [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] + col_name column_definition + +column_definition: + type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... From bf5d523116de0d0ae0b5572e4b869557601ba1d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 17:57:34 +0800 Subject: [PATCH 16/20] fix(stream): disable the exec of complete check status in timer thread. --- include/libs/stream/tstream.h | 2 + source/dnode/vnode/src/tqCommon/tqCommon.c | 67 ++++------------------ source/libs/stream/src/streamCheckStatus.c | 41 ++++++++----- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamMeta.c | 28 +++++++++ 5 files changed, 69 insertions(+), 71 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0aa00d50b4..e3487c49d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -56,6 +56,7 @@ extern "C" { #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) +#define STREAM_EXEC_T_ADD_FAILED_TASK (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -886,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 04c0c0d204..924b0a8207 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +typedef struct SMStreamCheckpointReadyRspMsg { + SMsgHead head; +} SMStreamCheckpointReadyRspMsg; + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - int64_t initTs = 0; - int64_t now = taosGetTimestampMs(); - STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId}; - STaskId fId = {0}; - bool hasHistoryTask = false; - - // todo extract method if (!isLeader) { - // this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map. - streamMetaRLock(pMeta); - - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - - tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - } else { - streamMetaRUnLock(pMeta); - - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - return code; + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - streamMetaRLock(pMeta); - - // let's try to find this task in hashmap - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - } else { // not exist even in the hash map of meta, forget it - streamMetaRUnLock(pMeta); - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return code; + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } code = streamProcessCheckRsp(pTask, &rsp); @@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } -typedef struct SMStreamCheckpointReadyRspMsg { - SMsgHead head; -} SMStreamCheckpointReadyRspMsg; - int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; + } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { + int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index eee1332821..ea9b2ef89f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -295,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -521,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); } +// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. +// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution +// of restart in timer thread will result in a dead lock. +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); + return -1; + } + + stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + return 0; +} + +// this function is executed in timer thread void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamMeta* pMeta = pTask->pMeta; @@ -545,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); - - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return; @@ -618,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } - + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); taosArrayDestroy(pNotReadyList); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 891e0aa142..250866005e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t doStreamExecTask(SStreamTask* pTask) { +static int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 03f8d2adfd..210199b912 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1706,4 +1706,32 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { } return 0; +} + +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = TSDB_CODE_SUCCESS; + + streamMetaWLock(pMeta); + stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); + + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL) { + STaskCheckInfo* pInfo = &(*ppTask)->taskCheckInfo; + int64_t now = taosGetTimestampMs(); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->startTs, now, false); + + if (HAS_RELATED_FILLHISTORY_TASK(*ppTask)) { + STaskId hId = (*ppTask)->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, pInfo->startTs, now, false); + } + } else { + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + streamId, taskId, pMeta->vgId); + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + streamMetaWUnLock(pMeta); + return code; } \ No newline at end of file From 82d560ddb314a4e3ffd02cf15724d94fd5858d02 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 17:58:47 +0800 Subject: [PATCH 17/20] fix(test): fix test cases. --- tests/system-test/0-others/information_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 137fe82178..9a112c669e 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(252, 253)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) From b990632e8d2adb44bb86195526542a371b60fe9f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 18:29:18 +0800 Subject: [PATCH 18/20] fix(stream): fix dead lock. --- source/libs/stream/src/streamMeta.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 210199b912..2f9a579bcc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1711,7 +1711,7 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t code = TSDB_CODE_SUCCESS; - streamMetaWLock(pMeta); + streamMetaRLock(pMeta); stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); STaskId id = {.streamId = streamId, .taskId = taskId}; @@ -1732,6 +1732,6 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta code = TSDB_CODE_STREAM_TASK_NOT_EXIST; } - streamMetaWUnLock(pMeta); + streamMetaRUnLock(pMeta); return code; } \ No newline at end of file From 6c93fe559344a332b1acd70fee098fe50dd1fe37 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 18:35:00 +0800 Subject: [PATCH 19/20] fix(stream): fix dead lock. --- source/libs/stream/src/streamMeta.c | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2f9a579bcc..edc1a148a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1710,21 +1710,29 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + + stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); streamMetaRLock(pMeta); - stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { - STaskCheckInfo* pInfo = &(*ppTask)->taskCheckInfo; - int64_t now = taosGetTimestampMs(); - streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->startTs, now, false); + startTs = (*ppTask)->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); + hId = (*ppTask)->hTaskInfo.id; - if (HAS_RELATED_FILLHISTORY_TASK(*ppTask)) { - STaskId hId = (*ppTask)->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, pInfo->startTs, now, false); + streamMetaRUnLock(pMeta); + + // add the failed task info, along with the related fill-history task info into tasks list. + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); } } else { stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", @@ -1732,6 +1740,5 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta code = TSDB_CODE_STREAM_TASK_NOT_EXIST; } - streamMetaRUnLock(pMeta); return code; } \ No newline at end of file From f16bd528a5b7e4cc817c1f041cc4f3f26f67f1f7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 21:53:50 +0800 Subject: [PATCH 20/20] fix(cos): fix syntax error. --- source/common/src/cos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 8ad5fb36b5..0db6664ab9 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1733,6 +1733,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; } +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; } #endif