From 30058f761940089b9e6a10bda7d25c697eee8d2d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 8 Nov 2022 10:46:32 +0800 Subject: [PATCH 1/5] enh: optimize the error code when create and delete a database in a dropping state --- include/common/tmsg.h | 1 + include/util/taoserror.h | 22 ++++++++--- source/client/src/clientMsgHandler.c | 6 ++- source/common/src/tmsg.c | 2 + source/dnode/mnode/impl/src/mndDb.c | 50 +++++++++++++------------ source/dnode/mnode/impl/src/mndStream.c | 7 +--- source/util/src/tarray.c | 2 + source/util/src/terror.c | 2 + 8 files changed, 57 insertions(+), 35 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8aa85fa6b4..aabc84659c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -852,6 +852,7 @@ typedef struct { int16_t hashSuffix; int8_t hashMethod; SArray* pVgroupInfos; // Array of SVgroupInfo + int32_t errCode; } SUseDbRsp; int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 447a90447a..a03dc7d9f9 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -154,7 +154,7 @@ int32_t* taosGetErrno(); // mnode-sdb #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320) -#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0321) +#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0321) // internal #define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0322) #define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323) #define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325) @@ -218,14 +218,24 @@ int32_t* taosGetErrno(); // mnode-db #define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) -#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) -#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382) +#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) // +#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382) // #define TSDB_CODE_MND_INVALID_DB TAOS_DEF_ERROR_CODE(0, 0x0383) +// #define TSDB_CODE_MND_MONITOR_DB_FORBIDDEN TAOS_DEF_ERROR_CODE(0, 0x0384) // 2.x #define TSDB_CODE_MND_TOO_MANY_DATABASES TAOS_DEF_ERROR_CODE(0, 0x0385) -#define TSDB_CODE_MND_DB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388) -#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0389) -#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x038A) +#define TSDB_CODE_MND_DB_IN_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0386) // +// #define TSDB_CODE_MND_VGROUP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0387) // 2.x +#define TSDB_CODE_MND_DB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388) // +#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0389) // internal +#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x038A) // #define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x038B) +// #define TSDB_CODE_MND_INVALID_DB_OPTION_DAYS TAOS_DEF_ERROR_CODE(0, 0x0390) // 2.x +// #define TSDB_CODE_MND_INVALID_DB_OPTION_KEEP TAOS_DEF_ERROR_CODE(0, 0x0391) // 2.x +// #define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x0392) // 2.x +// #define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x0393) // 2.x +// #define TSDB_CODE_MND_INVALID_TOPIC_PARTITONSTAOS_DEF_ERROR_CODE(0, 0x0394) // 2.x +// #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) // 2.x +#define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) // #define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A) // mnode-node diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 938cc4e41d..1b42005a3c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -212,7 +212,11 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); if (strlen(usedbRsp.db) == 0) { - return TSDB_CODE_MND_DB_NOT_EXIST; + if (usedbRsp.errCode != 0) { + return usedbRsp.errCode; + } else { + return TSDB_CODE_APP_ERROR; + } } SName name = {0}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4eaa934676..c39d4b0e11 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2489,6 +2489,7 @@ int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) { if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1; } + if (tEncodeI32(pEncoder, pRsp->errCode) < 0) return -1; return 0; } @@ -2553,6 +2554,7 @@ int32_t tDeserializeSUseDbRspImp(SDecoder *pDecoder, SUseDbRsp *pRsp) { taosArrayPush(pRsp->pVgroupInfos, &vgInfo); } + if (tDecodeI32(pDecoder, &pRsp->errCode) < 0) return -1; return 0; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index aad2832aa6..992acd9dee 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -285,8 +285,17 @@ static inline int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) { SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db); - if (pDb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; + if (pDb == NULL) { + if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) { + terrno = TSDB_CODE_MND_DB_IN_CREATING; + } else if (terrno = TSDB_CODE_SDB_OBJ_DROPPING) { + terrno = TSDB_CODE_MND_DB_IN_DROPPING; + } else { + terrno = TSDB_CODE_APP_ERROR; + mFatal("db:%s, failed to acquire db since %s", db, terrstr()); + } } return pDb; } @@ -594,16 +603,22 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_DB_ALREADY_EXIST; goto _OVER; } - } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) { - if (mndSetRpcInfoForDbTrans(pMnode, pReq, MND_OPER_CREATE_DB, createReq.db) == 0) { - mInfo("db:%s, is creating and response after trans finished", createReq.db); - code = TSDB_CODE_ACTION_IN_PROGRESS; + } else { + if (terrno == TSDB_CODE_MND_DB_IN_CREATING) { + if (mndSetRpcInfoForDbTrans(pMnode, pReq, MND_OPER_CREATE_DB, createReq.db) == 0) { + mInfo("db:%s, is creating and response after trans finished", createReq.db); + code = TSDB_CODE_ACTION_IN_PROGRESS; + goto _OVER; + } else { + goto _OVER; + } + } else if (terrno == TSDB_CODE_MND_DB_IN_DROPPING) { goto _OVER; - } else { + } else if (terrno == TSDB_CODE_MND_DB_NOT_EXIST) { + // continue + } else { // TSDB_CODE_APP_ERROR goto _OVER; } - } else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) { - goto _OVER; } pUser = mndAcquireUser(pMnode, pReq->info.conn.user); @@ -786,7 +801,6 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, alterReq.db); if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } @@ -836,7 +850,6 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, cfgReq.db); if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } @@ -1066,11 +1079,8 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) { if (pDb == NULL) { if (dropReq.ignoreNotExists) { code = mndBuildDropDbRsp(pDb, &pReq->info.rspLen, &pReq->info.rsp, true); - goto _OVER; - } else { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; } + goto _OVER; } if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) { @@ -1197,10 +1207,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) { int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode); if (usedbReq.vgVersion < vgVersion) { usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); - if (usedbRsp.pVgroupInfos == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } + if (usedbRsp.pVgroupInfos == NULL) goto _OVER; mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); usedbRsp.vgVersion = vgVersion++; @@ -1209,16 +1216,13 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) { } usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); code = 0; - - // no jump, need to construct rsp } else { pDb = mndAcquireDb(pMnode, usedbReq.db); if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); usedbRsp.uid = usedbReq.dbId; usedbRsp.vgVersion = usedbReq.vgVersion; + usedbRsp.errCode = terrno; mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); } else { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 36ba0aaf87..594c13f957 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -287,9 +287,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); if (pSourceDb == NULL) { - /*ASSERT(0);*/ - mInfo("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb); - terrno = TSDB_CODE_MND_DB_NOT_EXIST; + mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); return -1; } pObj->sourceDbUid = pSourceDb->uid; @@ -298,8 +296,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); if (pTargetDb == NULL) { - mInfo("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb); - terrno = TSDB_CODE_MND_DB_NOT_EXIST; + mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); return -1; } tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 309e6b30ae..95065972a3 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -26,12 +26,14 @@ SArray* taosArrayInit(size_t size, size_t elemSize) { SArray* pArray = taosMemoryMalloc(sizeof(SArray)); if (pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pArray->size = 0; pArray->pData = taosMemoryCalloc(size, elemSize); if (pArray->pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pArray); return NULL; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f7e56f372f..0e6568d692 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -225,11 +225,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exis TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, "Too many databases for account") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_DROPPING, "Database in dropping status") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_EXIST, "Database not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating status") // mnode-node TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists") From 36450e6dd7d05a1c475f2b9e3d99b9e3dfb2ea20 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 8 Nov 2022 11:08:14 +0800 Subject: [PATCH 2/5] enh: optimize the error code when create and delete a database in a dropping state --- source/client/src/clientMsgHandler.c | 3 ++- source/libs/catalog/inc/catalogInt.h | 3 ++- source/libs/parser/src/parTranslater.c | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 1b42005a3c..d46ff7fe5b 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -175,7 +175,8 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; - if (TSDB_CODE_MND_DB_NOT_EXIST == code) { + if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code || + TSDB_CODE_MND_DB_IN_DROPPING == code) { SUseDbRsp usedbRsp = {0}; tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); struct SCatalog* pCatalog = NULL; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 8c699bb59b..4d3a3a1ab4 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -538,7 +538,8 @@ typedef struct SCtgOperation { (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) -#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST) +#define CTG_DB_NOT_EXIST(code) \ + (code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING) #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d3ff787f50..90bc5c0e3b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2181,7 +2181,8 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge if (TSDB_DB_NAME_T == pTargetName->type) { int32_t code = getDBVgInfoImpl(pCxt, pTargetName, pVgroupList); - if (TSDB_CODE_MND_DB_NOT_EXIST == code) { + if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code || + TSDB_CODE_MND_DB_IN_DROPPING == code) { code = TSDB_CODE_SUCCESS; } return code; @@ -2196,7 +2197,8 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge } else { taosArrayPush(*pVgroupList, &vgInfo); } - } else if (TSDB_CODE_MND_DB_NOT_EXIST == code) { + } else if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code || + TSDB_CODE_MND_DB_IN_DROPPING == code) { code = TSDB_CODE_SUCCESS; } return code; From bc469f7f7b82db40296a3261161ac582a4a29bf5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 8 Nov 2022 11:09:19 +0800 Subject: [PATCH 3/5] fix(stream): delete tb should be checked in write thread --- include/common/tmsg.h | 3 +- include/libs/stream/tstream.h | 2 + source/client/src/clientTmq.c | 4 +- source/common/src/tmsg.c | 4 +- source/dnode/vnode/src/tq/tqSink.c | 16 +++- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 14 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 20 ++++- source/libs/executor/inc/executorimpl.h | 13 ++- source/libs/executor/src/executorimpl.c | 35 ++++---- source/libs/executor/src/tfill.c | 6 +- source/libs/executor/src/timewindowoperator.c | 56 ++++++------- source/libs/stream/inc/streamInc.h | 2 - source/libs/stream/src/stream.c | 37 --------- source/libs/stream/src/streamExec.c | 82 +++++++------------ source/libs/wal/src/walWrite.c | 3 +- source/util/src/tqueue.c | 9 +- 16 files changed, 134 insertions(+), 172 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8aa85fa6b4..c2cd7de139 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3133,7 +3133,8 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); typedef struct { - int64_t uid; + // int64_t uid; + char tbname[TSDB_TABLE_NAME_LEN]; int64_t ts; } SSingleDeleteReq; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1c3f905e23..0354078b7b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -317,6 +317,8 @@ typedef struct SStreamTask { int8_t inputStatus; int8_t outputStatus; + // STaosQueue* inputQueue1; + // STaosQall* inputQall; SStreamQueue* inputQueue; SStreamQueue* outputQueue; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8e7faf48f6..ab44236d96 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->dataRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } @@ -1667,7 +1667,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->metaRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4eaa934676..31f532b6ce 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6129,13 +6129,13 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { } int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { - if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; return 0; } int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) { - if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 65e8d69994..913fa67bd6 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -25,6 +25,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + tqDebug("stream delete msg: row %d", totRow); + for (int32_t row = 0; row < totRow; row++) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); @@ -36,11 +38,14 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl } else { name = buildCtbNameByGroupId(stbFullName, groupId); } - tqDebug("stream delete msg: groupId :%" PRId64 ", name: %s", groupId, name); + tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId, + name, ts); +#if 0 SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, name) < 0) { metaReaderClear(&mr); + tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); taosMemoryFree(name); continue; } @@ -48,10 +53,13 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl int64_t uid = mr.me.uid; metaReaderClear(&mr); taosMemoryFree(name); +#endif SSingleDeleteReq req = { .ts = ts, - .uid = uid, }; + strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN); + taosMemoryFree(name); + /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/ taosArrayPush(deleteReq->deleteReqs, &req); } return 0; @@ -309,6 +317,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.suid = suid; tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); + if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { + taosArrayDestroy(deleteReq.deleteReqs); + continue; + } int32_t len; int32_t code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 48b3e9ff77..c663e2b526 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -117,12 +117,12 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info); } if (pMsgIter->sversion != info.skmVer) { - tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, - TD_VID(pTsdb->pVnode), pMsgIter->sversion, info.skmVer, suid, uid); + tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode), + pMsgIter->sversion, info.skmVer, suid, uid); code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; goto _err; } - + pRsp->sver = info.skmVer; // create/get STbData to op @@ -198,14 +198,14 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid } tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + " at version %" PRId64 " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code)); return code; _err: tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + " at version %" PRId64 " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 089905ee20..6883354547 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -945,7 +945,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); tbCreated = true; } - + msgIter.uid = createTbReq.uid; if (createTbReq.type == TSDB_CHILD_TABLE) { msgIter.suid = createTbReq.ctb.suid; @@ -958,7 +958,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq #endif tDecoderClear(&decoder); taosArrayDestroy(createTbReq.ctb.tagName); - } + } if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { submitBlkRsp.code = terrno; @@ -1169,16 +1169,28 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void tDecoderInit(&decoder, pReq, len); tDecodeSBatchDeleteReq(&decoder, &deleteReq); + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); for (int32_t i = 0; i < sz; i++) { SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i); - int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); + char *name = pOneReq->tbname; + if (metaGetTableEntryByName(&mr, name) < 0) { + vDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); + continue; + } + + int64_t uid = mr.me.uid; + + int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); if (code < 0) { terrno = code; vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, - TD_VID(pVnode), terrstr(), deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); + TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); } } + metaReaderClear(&mr); taosArrayDestroy(deleteReq.deleteReqs); return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cd9f29978d..771ab10140 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -87,11 +87,11 @@ typedef struct SLimit { typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; typedef struct STaskCostInfo { - int64_t created; - int64_t start; - uint64_t elapsedTime; - double extractListTime; - double groupIdMapTime; + int64_t created; + int64_t start; + uint64_t elapsedTime; + double extractListTime; + double groupIdMapTime; SFileBlockLoadRecorder* pRecoder; } STaskCostInfo; @@ -184,8 +184,7 @@ enum { typedef struct SOperatorFpSet { __optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b7c3eed069..efb9a49df2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -114,7 +114,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, - .getStreamResFn = streamFn, .cleanupFn = cleanup, .closeFn = closeFn, .getExplainFn = explain, @@ -819,7 +818,7 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) { // abort current query execution. if (pTaskInfo->owner != 0 && ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec()) - /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { + /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { assert(pTaskInfo->cost.start != 0); // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); @@ -1899,7 +1898,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn // printf("%d completed, try next\n", i); qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); @@ -1984,7 +1983,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } - _error: +_error: pTaskInfo->code = code; } @@ -2046,7 +2045,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 " try next", + ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); @@ -2063,7 +2062,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -2072,7 +2071,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -2260,7 +2259,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL); return pOperator; - _error: +_error: if (pInfo != NULL) { doDestroyExchangeOperatorInfo(pInfo); } @@ -3173,8 +3172,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -3353,9 +3352,9 @@ bool groupbyTbname(SNodeList* pGroupList) { SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) { - int32_t type = nodeType(pPhyNode); + int32_t type = nodeType(pPhyNode); STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; - const char* idstr = GET_TASKID(pTaskInfo); + const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { SOperatorInfo* pOperator = NULL; @@ -3428,7 +3427,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo for (int32_t i = 0; i < sz; i++) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); - qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId); + qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId); } #endif } @@ -3461,7 +3460,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { + for (int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { STableKeyInfo* p = taosArrayGet(pList, i); tableListAddTableInfo(pTableListInfo, p->uid, 0); } @@ -3501,7 +3500,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } - size_t size = LIST_LENGTH(pPhyNode->pChildren); + size_t size = LIST_LENGTH(pPhyNode->pChildren); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); if (ops == NULL) { return NULL; @@ -3712,7 +3711,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32 *length = *(int32_t*)(*result); } - _downstream: +_downstream: for (int32_t i = 0; i < ops->numOfDownstream; ++i) { code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); if (code != TDB_CODE_SUCCESS) { @@ -3971,8 +3970,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat int32_t size = 0; void* pVal = NULL; SWinKey key = { - .ts = *(TSKEY*)pPos->key, - .groupId = pPos->groupId, + .ts = *(TSKEY*)pPos->key, + .groupId = pPos->groupId, }; int32_t code = streamStateGet(pState, &key, &pVal, &size); ASSERT(code == 0); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ebc3a962d3..9f06e639b3 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1691,14 +1691,14 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pInfo->srcRowIndex = 0; - pOperator->name = "FillOperator"; + pOperator->name = "StreamFillOperator"; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, - NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 88f3e4cff9..7b6ed5b67d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "filter.h" #include "executorimpl.h" +#include "filter.h" #include "function.h" #include "functionMgt.h" #include "tcommon.h" @@ -986,7 +986,7 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf // current result is done in computing final results. if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pResult); - SListNode *pNode = tdListPopHead(pResultRowInfo->openWindow); + SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow); taosMemoryFree(pNode); } } @@ -1255,35 +1255,33 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pInfo->binfo.pRes; - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - return pOperator->fpSet.getStreamResFn(pOperator); - } else { - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; - } + ASSERT(pInfo->execModel == OPTR_EXEC_MODEL_BATCH); - blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - - bool hasRemain = hasRemainResults(&pInfo->groupResInfo); - if (!hasRemain) { - doSetOperatorCompleted(pOperator); - break; - } - - if (pBlock->info.rows > 0) { - break; - } - } - - size_t rows = pBlock->info.rows; - pOperator->resultInfo.totalRows += rows; - - return (rows == 0) ? NULL : pBlock; + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; } + + blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); + while (1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pBlock->info.rows > 0) { + break; + } + } + + size_t rows = pBlock->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0) ? NULL : pBlock; } static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 6a3bdb59c9..0fc75c4798 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -32,8 +32,6 @@ typedef struct { static SStreamGlobalEnv streamEnv; -// int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch); - int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b71562cf45..e6d5859163 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -240,43 +240,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return 0; } -#if 0 -int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - - SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); - pCont->inputStatus = pTask->inputStatus; - pCont->streamId = pTask->streamId; - pCont->reqTaskId = pTask->taskId; - pCont->rspTaskId = pReq->upstreamTaskId; - - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp); - tmsgSendRsp(pRsp); - return 0; -} - -int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) { - streamProcessRunReq(pTask); - - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - // scan data to recover - pTask->inputStatus = TASK_INPUT_STATUS__RECOVER; - pTask->taskStatus = TASK_STATUS__RECOVER_SELF; - qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); - if (streamPipelineExec(pTask, 100, true) < 0) { - return -1; - } - } else { - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->taskStatus = TASK_STATUS__NORMAL; - } - - return 0; -} -#endif - int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 46fab53659..e7f2b60704 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -138,63 +138,39 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } #if 0 -int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) { - ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); - - void* exec = pTask->exec.executor; - - while (1) { - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - int32_t batchCnt = 0; - while (1) { - SSDataBlock* output = NULL; - uint64_t ts = 0; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(0); - } - if (output == NULL) break; - - SSDataBlock block = {0}; - assignOneDataBlock(&block, output); - block.info.childId = pTask->selfChildId; - taosArrayPush(pRes, &block); - - if (++batchCnt >= batchNum) break; - } - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - break; - } - if (dispatch) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - qRes->childId = pTask->selfChildId; - - if (streamTaskOutput(pTask, qRes) < 0) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return -1; - } - - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamDispatch(pTask); - } +int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { + // fetch all queue item, merge according to batchLimit + int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall); + if (numOfItems == 0) { + qDebug("task: %d, stream task exec over, queue empty", pTask->taskId); + return 0; + } + SStreamQueueItem* pMerged = NULL; + SStreamQueueItem* pItem = NULL; + taosGetQitem(pTask->inputQall, (void**)&pItem); + if (pItem == NULL) { + if (pMerged != NULL) { + // process merged item } else { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return 0; } } + // if drop + if (pItem->type == STREAM_INPUT__DESTROY) { + // set status drop + return -1; + } + + if (pTask->taskLevel == TASK_LEVEL__SINK) { + ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, (SStreamDataBlock*)pItem); + } + + // exec impl + + // output + // try dispatch return 0; } #endif diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index c723618a4b..b683ba1926 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -482,7 +482,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType)); + wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index, + TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody); code = walWriteIndex(pWal, index, offset); if (code < 0) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index c8f128e666..0f992184b5 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -208,7 +208,7 @@ STaosQall *taosAllocateQall() { void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); } int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { - int32_t code = 0; + int32_t numOfItems = 0; bool empty; taosThreadMutexLock(&queue->mutex); @@ -219,13 +219,14 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; - code = qall->numOfItems; + numOfItems = qall->numOfItems; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; queue->memOfItems = 0; - uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); + uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems, + queue->memOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } @@ -237,7 +238,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { qall->start = NULL; qall->numOfItems = 0; } - return code; + return numOfItems; } int32_t taosGetQitem(STaosQall *qall, void **ppItem) { From fd53afd70ca44948017eebe11d4a3464f7692fcb Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 8 Nov 2022 12:23:22 +0800 Subject: [PATCH 4/5] fix: add installation folder to system path (#17943) * fix: add installation folder to system path * fix: add c:\tdengine in env path to issc setup --- packaging/release.bat | 2 +- packaging/tools/make_install.bat | 97 ++++++++++++++++++++++---------- packaging/tools/tdengine.iss | 23 ++++++++ 3 files changed, 91 insertions(+), 31 deletions(-) diff --git a/packaging/release.bat b/packaging/release.bat index 4c82c5ead5..4ab7297f03 100644 --- a/packaging/release.bat +++ b/packaging/release.bat @@ -39,7 +39,7 @@ if not exist %work_dir%\debug\ver-%2-x86 ( md %work_dir%\debug\ver-%2-x86 ) cd %work_dir%\debug\ver-%2-x64 -call vcvarsall.bat x64 +rem #call vcvarsall.bat x64 cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64 cmake --build . rd /s /Q C:\TDengine diff --git a/packaging/tools/make_install.bat b/packaging/tools/make_install.bat index f5d1e45690..c19519f9a1 100644 --- a/packaging/tools/make_install.bat +++ b/packaging/tools/make_install.bat @@ -1,4 +1,7 @@ @echo off + +for /F %%a in ('echo prompt $E ^| cmd') do set "ESC=%%a" + goto %1 :needAdmin @@ -11,60 +14,94 @@ set binary_dir=%3 set binary_dir=%binary_dir:/=\\% set osType=%4 set verNumber=%5 -set tagert_dir=C:\\TDengine +set target_dir=C:\\TDengine -if not exist %tagert_dir% ( - mkdir %tagert_dir% +if not exist %target_dir% ( + mkdir %target_dir% ) -if not exist %tagert_dir%\\cfg ( - mkdir %tagert_dir%\\cfg +if not exist %target_dir%\\cfg ( + mkdir %target_dir%\\cfg ) -if not exist %tagert_dir%\\include ( - mkdir %tagert_dir%\\include +if not exist %target_dir%\\include ( + mkdir %target_dir%\\include ) -if not exist %tagert_dir%\\driver ( - mkdir %tagert_dir%\\driver +if not exist %target_dir%\\driver ( + mkdir %target_dir%\\driver ) if not exist C:\\TDengine\\cfg\\taos.cfg ( - copy %source_dir%\\packaging\\cfg\\taos.cfg %tagert_dir%\\cfg\\taos.cfg > nul + copy %source_dir%\\packaging\\cfg\\taos.cfg %target_dir%\\cfg\\taos.cfg > nul ) if exist %binary_dir%\\test\\cfg\\taosadapter.toml ( - if not exist %tagert_dir%\\cfg\\taosadapter.toml ( - copy %binary_dir%\\test\\cfg\\taosadapter.toml %tagert_dir%\\cfg\\taosadapter.toml > nul + if not exist %target_dir%\\cfg\\taosadapter.toml ( + copy %binary_dir%\\test\\cfg\\taosadapter.toml %target_dir%\\cfg\\taosadapter.toml > nul ) ) -copy %source_dir%\\include\\client\\taos.h %tagert_dir%\\include > nul -copy %source_dir%\\include\\util\\taoserror.h %tagert_dir%\\include > nul -copy %source_dir%\\include\\libs\\function\\taosudf.h %tagert_dir%\\include > nul -copy %binary_dir%\\build\\lib\\taos.lib %tagert_dir%\\driver > nul -copy %binary_dir%\\build\\lib\\taos_static.lib %tagert_dir%\\driver > nul -copy %binary_dir%\\build\\lib\\taos.dll %tagert_dir%\\driver > nul -copy %binary_dir%\\build\\bin\\taos.exe %tagert_dir% > nul -copy %binary_dir%\\build\\bin\\taosd.exe %tagert_dir% > nul -copy %binary_dir%\\build\\bin\\udfd.exe %tagert_dir% > nul +copy %source_dir%\\include\\client\\taos.h %target_dir%\\include > nul +copy %source_dir%\\include\\util\\taoserror.h %target_dir%\\include > nul +copy %source_dir%\\include\\libs\\function\\taosudf.h %target_dir%\\include > nul +copy %binary_dir%\\build\\lib\\taos.lib %target_dir%\\driver > nul +copy %binary_dir%\\build\\lib\\taos_static.lib %target_dir%\\driver > nul +copy %binary_dir%\\build\\lib\\taos.dll %target_dir%\\driver > nul +copy %binary_dir%\\build\\bin\\taos.exe %target_dir% > nul if exist %binary_dir%\\build\\bin\\taosBenchmark.exe ( - copy %binary_dir%\\build\\bin\\taosBenchmark.exe %tagert_dir% > nul + copy %binary_dir%\\build\\bin\\taosBenchmark.exe %target_dir% > nul ) if exist %binary_dir%\\build\\lib\\taosws.dll.lib ( - copy %binary_dir%\\build\\lib\\taosws.dll.lib %tagert_dir%\\driver > nul + copy %binary_dir%\\build\\lib\\taosws.dll.lib %target_dir%\\driver > nul ) if exist %binary_dir%\\build\\lib\\taosws.dll ( - copy %binary_dir%\\build\\lib\\taosws.dll %tagert_dir%\\driver > nul - copy %source_dir%\\tools\\taosws-rs\\target\\release\\taosws.h %tagert_dir%\\include > nul + copy %binary_dir%\\build\\lib\\taosws.dll %target_dir%\\driver > nul + copy %source_dir%\\tools\\taosws-rs\\target\\release\\taosws.h %target_dir%\\include > nul ) if exist %binary_dir%\\build\\bin\\taosdump.exe ( - copy %binary_dir%\\build\\bin\\taosdump.exe %tagert_dir% > nul -) -if exist %binary_dir%\\build\\bin\\taosadapter.exe ( - copy %binary_dir%\\build\\bin\\taosadapter.exe %tagert_dir% > nul + copy %binary_dir%\\build\\bin\\taosdump.exe %target_dir% > nul ) -mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)&& echo To start/stop TDengine with administrator privileges: sc start/stop taosd &goto :eof +copy %binary_dir%\\build\\bin\\taosd.exe %target_dir% > nul +copy %binary_dir%\\build\\bin\\udfd.exe %target_dir% > nul + +if exist %binary_dir%\\build\\bin\\taosadapter.exe ( + copy %binary_dir%\\build\\bin\\taosadapter.exe %target_dir% > nul +) + +mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close) + +echo. +echo Please manually remove C:\TDengine from your system PATH environment after you remove TDengine software +echo. +echo To start/stop TDengine with administrator privileges: %ESC%[92msc start/stop taosd %ESC%[0m + +if exist %binary_dir%\\build\\bin\\taosadapter.exe ( + echo To start/stop taosAdapter with administrator privileges: %ESC%[92msc start/stop taosadapter %ESC%[0m +) + +goto :eof + :hasAdmin + +sc query "taosd" && sc stop taosd && sc delete taosd +sc query "taosadapter" && sc stop taosadapter && sc delete taosd + copy /y C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 > nul if exist C:\\TDengine\\driver\\taosws.dll ( copy /y C:\\TDengine\\driver\\taosws.dll C:\\Windows\\System32 > nul ) + sc query "taosd" >nul || sc create "taosd" binPath= "C:\\TDengine\\taosd.exe --win_service" start= DEMAND sc query "taosadapter" >nul || sc create "taosadapter" binPath= "C:\\TDengine\\taosadapter.exe" start= DEMAND + +set "env=HKLM\System\CurrentControlSet\Control\Session Manager\Environment" +for /f "tokens=2*" %%I in ('reg query "%env%" /v Path ^| findstr /i "\"') do ( + + rem // make addition persistent through reboots + reg add "%env%" /f /v Path /t REG_EXPAND_SZ /d "%%J;C:\TDengine" + + rem // apply change to the current process + for %%a in ("%%J;C:\TDengine") do path %%~a +) + +rem // use setx to set a temporary throwaway value to trigger a WM_SETTINGCHANGE +rem // applies change to new console windows without requiring a reboot +(setx /m foo bar & reg delete "%env%" /f /v foo) >NUL 2>NUL + diff --git a/packaging/tools/tdengine.iss b/packaging/tools/tdengine.iss index ec9c432092..981bee91b8 100644 --- a/packaging/tools/tdengine.iss +++ b/packaging/tools/tdengine.iss @@ -60,6 +60,29 @@ Source: {#MyAppSourceDir}{#MyAppIncludeName}; DestDir: "{app}\include"; Flags: i Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExcludeSource} ; Flags: igNoreversion recursesubdirs createallsubdirs Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs + +[Registry] +Root: HKLM; Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"; \ + ValueType: expandsz; ValueName: "Path"; ValueData: "{olddata};C:\TDengine"; \ + Check: NeedsAddPath('C:\TDengine') + +[Code] +function NeedsAddPath(Param: string): boolean; +var + OrigPath: string; +begin + if not RegQueryStringValue(HKEY_LOCAL_MACHINE, + 'SYSTEM\CurrentControlSet\Control\Session Manager\Environment', + 'Path', OrigPath) + then begin + Result := True; + exit; + end; + { look for the path with leading and trailing semicolon } + { Pos() returns 0 if not found } + Result := Pos(';' + Param + ';', ';' + OrigPath + ';') = 0; +end; + [UninstallDelete] Name: {app}\driver; Type: filesandordirs Name: {app}\connector; Type: filesandordirs From abd95562a0a24ba7d94168e111fe0bde9190e079 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 8 Nov 2022 12:36:28 +0800 Subject: [PATCH 5/5] enh: optimize the error code when create and delete a database in a dropping state --- source/dnode/mnode/impl/src/mndDb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 992acd9dee..3d9b91dce8 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -290,7 +290,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) { terrno = TSDB_CODE_MND_DB_IN_CREATING; - } else if (terrno = TSDB_CODE_SDB_OBJ_DROPPING) { + } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) { terrno = TSDB_CODE_MND_DB_IN_DROPPING; } else { terrno = TSDB_CODE_APP_ERROR;