Merge branch '3.0' into enh/TS-5574-3.0

This commit is contained in:
kailixu 2024-11-08 15:07:16 +08:00
commit e28f480df6
42 changed files with 629 additions and 306 deletions

View File

@ -155,6 +155,7 @@ typedef enum EStreamType {
STREAM_MID_RETRIEVE,
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
} EStreamType;
#pragma pack(push, 1)
@ -401,6 +402,8 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);
#define TSMA_RES_STB_EXTRA_COLUMN_NUM 4 // 3 columns: _wstart, _wend, _wduration, 1 tag: tbname
static inline bool isTsmaResSTb(const char* stbName) {
static bool showTsmaTables = false;
if (showTsmaTables) return false;
const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX);
if (pos && strlen(stbName) == (pos - stbName) + strlen(TSMA_RES_STB_POSTFIX)) {
return true;

View File

@ -188,7 +188,6 @@ extern int32_t tsMaxRetryWaitTime;
extern bool tsUseAdapter;
extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogThresholdTest;
extern char tsSlowLogExceptDb[];
extern int32_t tsSlowLogScope;
extern int32_t tsSlowLogMaxLen;

View File

@ -676,7 +676,7 @@ typedef struct {
int32_t tsSlowLogThreshold;
int32_t tsSlowLogMaxLen;
int32_t tsSlowLogScope;
int32_t tsSlowLogThresholdTest;
int32_t tsSlowLogThresholdTest; //Obsolete
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN];
} SMonitorParas;
@ -3220,6 +3220,7 @@ int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp);
typedef struct {
char* name;
uint64_t suid; // for tmq in wal format
int64_t uid;
int8_t igNotExists;
} SVDropTbReq;

View File

@ -336,6 +336,7 @@ typedef struct SStateStore {
int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache,
int32_t* pWinCode);
int32_t (*streamStateDeleteParName)(SStreamState* pState, int64_t groupId);
int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode);

View File

@ -116,6 +116,7 @@ void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode);
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId);
// group id
int32_t streamStateGroupPut(SStreamState* pState, int64_t groupId, void* value, int32_t vLen);

View File

@ -462,7 +462,7 @@ struct SStreamTask {
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
void* pBackend;
int8_t subtableWithoutMd5;
int8_t subtableWithoutMd5; // only for tsma stream tasks
char reserve[256];
char* backendPath;
};

View File

@ -138,6 +138,7 @@ typedef struct {
int8_t scanMeta;
int8_t deleteMsg;
int8_t enableRef;
int8_t scanDropCtb;
} SWalFilterCond;
// todo hide this struct

View File

@ -294,8 +294,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
}
}
if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL ||
duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL) &&
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {

View File

@ -29,6 +29,12 @@ TARGET_LINK_LIBRARIES(
# PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor
#)
ADD_EXECUTABLE(userOperTest ../../../tests/script/api/passwdTest.c)
TARGET_LINK_LIBRARIES(
userOperTest
PUBLIC taos
)
TARGET_INCLUDE_DIRECTORIES(
clientTest
PUBLIC "${TD_SOURCE_DIR}/include/client/"
@ -69,3 +75,8 @@ add_test(
# NAME clientMonitorTest
# COMMAND clientMonitorTest
# )
add_test(
NAME userOperTest
COMMAND userOperTest
)

View File

@ -185,7 +185,6 @@ int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 10; // seconds
int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
char *tsSlowLogScopeString = "query";
@ -766,7 +765,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 86400, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "slowLogThresholdTest", tsSlowLogThresholdTest, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 1, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "slowLogMaxLen", tsSlowLogMaxLen, 1, 16384, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "slowLogScope", tsSlowLogScopeString, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
@ -1450,9 +1448,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "slowLogExceptDb");
tstrncpy(tsSlowLogExceptDb, pItem->str, TSDB_DB_NAME_LEN);
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "slowLogThresholdTest");
tsSlowLogThresholdTest = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "slowLogThreshold");
tsSlowLogThreshold = pItem->i32;
@ -2024,7 +2019,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"monitor", &tsEnableMonitor},
{"monitorInterval", &tsMonitorInterval},
{"slowLogThreshold", &tsSlowLogThreshold},
{"slowLogThresholdTest", &tsSlowLogThresholdTest},
{"slowLogMaxLen", &tsSlowLogMaxLen},
{"mndSdbWriteDelta", &tsMndSdbWriteDelta},

View File

@ -76,7 +76,7 @@ static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas *p
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogScope));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest)); //Obsolete
TAOS_CHECK_RETURN(tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb));
return 0;
}
@ -87,7 +87,7 @@ static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas *pMoni
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest)); //Obsolete
TAOS_CHECK_RETURN(tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb));
return 0;
}
@ -10277,6 +10277,7 @@ static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
TAOS_CHECK_RETURN(tStartEncode(pCoder));
TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pReq->name));
TAOS_CHECK_RETURN(tEncodeU64(pCoder, pReq->suid));
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pReq->uid));
TAOS_CHECK_RETURN(tEncodeI8(pCoder, pReq->igNotExists));
tEndEncode(pCoder);
@ -10287,6 +10288,7 @@ static int32_t tDecodeSVDropTbReq(SDecoder *pCoder, SVDropTbReq *pReq) {
TAOS_CHECK_RETURN(tStartDecode(pCoder));
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pReq->name));
TAOS_CHECK_RETURN(tDecodeU64(pCoder, &pReq->suid));
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pReq->uid));
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pReq->igNotExists));
tEndDecode(pCoder);

View File

@ -195,7 +195,6 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
req.clusterCfg.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
char timestr[32] = "1970-01-01 00:00:00.00";
if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0) != 0) {

View File

@ -482,7 +482,6 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogThresholdTest, DND_REASON_STATUS_MONITOR_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);

View File

@ -304,7 +304,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
connectRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
connectRsp.enableAuditDelete = tsEnableAuditDelete;
tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
connectRsp.whiteListVer = pUser->ipWhiteListVer;
@ -706,7 +705,6 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
batchRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
batchRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
batchRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
batchRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;

View File

@ -4063,8 +4063,8 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
}
typedef struct SVDropTbVgReqs {
SVDropTbBatchReq req;
SVgroupInfo info;
SArray *pBatchReqs;
SVgroupInfo info;
} SVDropTbVgReqs;
typedef struct SMDropTbDbInfo {
@ -4086,45 +4086,21 @@ typedef struct SMDropTbTsmaInfos {
} SMDropTbTsmaInfos;
typedef struct SMndDropTbsWithTsmaCtx {
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
SArray *pResTbNames; // SArray<char*>
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
} SMndDropTbsWithTsmaCtx;
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
int32_t vgId);
static void destroySVDropTbBatchReqs(void *p);
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
if (!p) return;
if (p->pDbMap) {
void *pIter = taosHashIterate(p->pDbMap, NULL);
while (pIter) {
SMDropTbDbInfo *pInfo = pIter;
taosArrayDestroy(pInfo->dbVgInfos);
pIter = taosHashIterate(p->pDbMap, pIter);
}
taosHashCleanup(p->pDbMap);
}
if (p->pResTbNames) {
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
}
if (p->pTsmaMap) {
void *pIter = taosHashIterate(p->pTsmaMap, NULL);
while (pIter) {
SMDropTbTsmaInfos *pInfos = pIter;
taosArrayDestroy(pInfos->pTsmaInfos);
pIter = taosHashIterate(p->pTsmaMap, pIter);
}
taosHashCleanup(p->pTsmaMap);
}
if (p->pVgMap) {
void *pIter = taosHashIterate(p->pVgMap, NULL);
while (pIter) {
SVDropTbVgReqs *pReqs = pIter;
taosArrayDestroy(pReqs->req.pArray);
taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
pIter = taosHashIterate(p->pVgMap, pIter);
}
taosHashCleanup(p->pVgMap);
@ -4136,24 +4112,13 @@ static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
int32_t code = 0;
SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
if (!pCtx) return terrno;
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (!pCtx->pTsmaMap) {
code = terrno;
goto _end;
}
pCtx->pDbMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (!pCtx->pDbMap) {
code = terrno;
goto _end;
}
pCtx->pResTbNames = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pCtx->pVgMap) {
code = terrno;
goto _end;
}
*ppCtx = pCtx;
_end:
if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
@ -4192,16 +4157,43 @@ static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, con
}
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
int32_t contLen) {
int32_t contLen, tmsg_t msgType) {
STransAction action = {0};
action.epSet = pVgReqs->info.epSet;
action.pCont = pCont;
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_TABLE;
action.msgType = msgType;
action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return mndTransAppendRedoAction(pTrans, &action);
}
static int32_t mndBuildDropTbRedoActions(SMnode *pMnode, STrans *pTrans, SHashObj *pVgMap, tmsg_t msgType) {
int32_t code = 0;
void *pIter = taosHashIterate(pVgMap, NULL);
while (pIter) {
const SVDropTbVgReqs *pVgReqs = pIter;
int32_t len = 0;
for (int32_t i = 0; i < taosArrayGetSize(pVgReqs->pBatchReqs) && code == TSDB_CODE_SUCCESS; ++i) {
SVDropTbBatchReq *pBatchReq = taosArrayGet(pVgReqs->pBatchReqs, i);
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, pBatchReq, &len);
if (!p) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
break;
}
if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
break;
}
}
if (TSDB_CODE_SUCCESS != code) {
taosHashCancelIterate(pVgMap, pIter);
break;
}
pIter = taosHashIterate(pVgMap, pIter);
}
return code;
}
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
int32_t code = 0;
SMnode *pMnode = pRsp->info.node;
@ -4216,23 +4208,7 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
void *pIter = taosHashIterate(pCtx->pVgMap, NULL);
while (pIter) {
const SVDropTbVgReqs *pVgReqs = pIter;
int32_t len = 0;
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
if (!p) {
taosHashCancelIterate(pCtx->pVgMap, pIter);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len)) != 0) {
taosHashCancelIterate(pCtx->pVgMap, pIter);
goto _OVER;
}
pIter = taosHashIterate(pCtx->pVgMap, pIter);
}
if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
_OVER:
@ -4257,10 +4233,11 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
if (code) goto _OVER;
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
code = mndDropTbForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
if (code) goto _OVER;
}
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
_OVER:
@ -4269,87 +4246,58 @@ _OVER:
TAOS_RETURN(code);
}
static int32_t createDropTbBatchReq(const SVDropTbReq *pReq, SVDropTbBatchReq *pBatchReq) {
pBatchReq->nReqs = 1;
pBatchReq->pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
if (!pBatchReq->pArray) return terrno;
if (taosArrayPush(pBatchReq->pArray, pReq) == NULL) {
taosArrayDestroy(pBatchReq->pArray);
pBatchReq->pArray = NULL;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
static void destroySVDropTbBatchReqs(void *p) {
SVDropTbBatchReq *pReq = p;
taosArrayDestroy(pReq->pArray);
pReq->pArray = NULL;
}
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
bool ignoreNotExists) {
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0};
SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
SVDropTbVgReqs reqs = {0};
if (pReqs == NULL) {
reqs.info = *pVgInfo;
reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
if (reqs.req.pArray == NULL) {
SVDropTbVgReqs *pVgReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
SVDropTbVgReqs vgReqs = {0};
if (pVgReqs == NULL) {
vgReqs.info = *pVgInfo;
vgReqs.pBatchReqs = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbBatchReq));
if (!vgReqs.pBatchReqs) return terrno;
SVDropTbBatchReq batchReq = {0};
int32_t code = createDropTbBatchReq(&req, &batchReq);
if (TSDB_CODE_SUCCESS != code) return code;
if (taosArrayPush(vgReqs.pBatchReqs, &batchReq) == NULL) {
taosArrayDestroy(batchReq.pArray);
return terrno;
}
if (taosArrayPush(reqs.req.pArray, &req) == NULL) {
return terrno;
}
if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &reqs, sizeof(reqs)) != 0) {
if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &vgReqs, sizeof(vgReqs)) != 0) {
taosArrayDestroyEx(vgReqs.pBatchReqs, destroySVDropTbBatchReqs);
return terrno;
}
} else {
if (taosArrayPush(pReqs->req.pArray, &req) == NULL) {
SVDropTbBatchReq batchReq = {0};
int32_t code = createDropTbBatchReq(&req, &batchReq);
if (TSDB_CODE_SUCCESS != code) return code;
if (taosArrayPush(pVgReqs->pBatchReqs, &batchReq) == NULL) {
taosArrayDestroy(batchReq.pArray);
return terrno;
}
}
return 0;
}
int vgInfoCmp(const void *lp, const void *rp) {
SVgroupInfo *pLeft = (SVgroupInfo *)lp;
SVgroupInfo *pRight = (SVgroupInfo *)rp;
if (pLeft->hashBegin < pRight->hashBegin) {
return -1;
} else if (pLeft->hashBegin > pRight->hashBegin) {
return 1;
}
return 0;
}
static int32_t mndGetDbVgInfoForTsma(SMnode *pMnode, const char *dbname, SMDropTbTsmaInfo *pInfo) {
int32_t code = 0;
SDbObj *pDb = mndAcquireDb(pMnode, dbname);
if (!pDb) {
code = TSDB_CODE_MND_DB_NOT_EXIST;
goto _end;
}
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (!pInfo->dbInfo.dbVgInfos) {
code = terrno;
goto _end;
}
mndBuildDBVgroupInfo(pDb, pMnode, pInfo->dbInfo.dbVgInfos);
taosArraySort(pInfo->dbInfo.dbVgInfos, vgInfoCmp);
pInfo->dbInfo.hashPrefix = pDb->cfg.hashPrefix;
pInfo->dbInfo.hashSuffix = pDb->cfg.hashSuffix;
pInfo->dbInfo.hashMethod = pDb->cfg.hashMethod;
_end:
if (pDb) mndReleaseDb(pMnode, pDb);
if (code && pInfo->dbInfo.dbVgInfos) {
taosArrayDestroy(pInfo->dbInfo.dbVgInfos);
pInfo->dbInfo.dbVgInfos = NULL;
}
TAOS_RETURN(code);
}
int32_t vgHashValCmp(const void *lp, const void *rp) {
uint32_t *key = (uint32_t *)lp;
SVgroupInfo *pVg = (SVgroupInfo *)rp;
if (*key < pVg->hashBegin) {
return -1;
} else if (*key > pVg->hashEnd) {
return 1;
}
return 0;
}
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
int32_t vgId) {
int32_t code = 0;
@ -4365,88 +4313,9 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWith
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
// get all stb uids
for (int32_t i = 0; i < pTbs->size; ++i) {
const SVDropTbReq *pTb = taosArrayGet(pTbs, i);
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
} else {
SMDropTbTsmaInfos infos = {0};
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
if (!infos.pTsmaInfos) {
code = terrno;
goto _end;
}
if (taosHashPut(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid), &infos, sizeof(infos)) != 0) {
code = terrno;
goto _end;
}
}
}
void *pIter = NULL;
SSmaObj *pSma = NULL;
char buf[TSDB_TABLE_FNAME_LEN] = {0};
// get used tsmas and it's dbs
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
if (!pIter) break;
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
if (pInfos) {
SMDropTbTsmaInfo info = {0};
int32_t len = sprintf(buf, "%s", pSma->name);
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_FNAME_LEN, "%s", buf);
SMDropTbDbInfo *pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
info.suid = pSma->dstTbUid;
if (!pDbInfo) {
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
if (taosHashPut(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN, &info.dbInfo, sizeof(SMDropTbDbInfo)) != 0) {
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
} else {
info.dbInfo = *pDbInfo;
}
if (taosArrayPush(pInfos->pTsmaInfos, &info) == NULL) {
code = terrno;
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
}
sdbRelease(pMnode->pSdb, pSma);
}
// generate vg req map
for (int32_t i = 0; i < pTbs->size; ++i) {
SVDropTbReq *pTb = taosArrayGet(pTbs, i);
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
SArray *pVgInfos = NULL;
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char resTbFullName[TSDB_TABLE_FNAME_LEN + 1] = {0};
for (int32_t j = 0; j < pInfos->pTsmaInfos->size; ++j) {
SMDropTbTsmaInfo *pInfo = taosArrayGet(pInfos->pTsmaInfos, j);
int32_t len = sprintf(buf, "%s_%s", pInfo->tsmaResTbNamePrefix, pTb->name);
len = taosCreateMD5Hash(buf, len);
len = snprintf(resTbFullName, TSDB_TABLE_FNAME_LEN + 1, "%s.%s", pInfo->tsmaResTbDbFName, buf);
uint32_t hashVal = taosGetTbHashVal(resTbFullName, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix,
pInfo->dbInfo.hashSuffix);
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
void *p = taosStrdup(resTbFullName + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
if (taosArrayPush(pCtx->pResTbNames, &p) == NULL) {
code = terrno;
goto _end;
}
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end);
}
}
_end:
return code;
@ -4474,9 +4343,10 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
code = mndInitDropTbsWithTsmaCtx(&pCtx);
if (code) goto _end;
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
code = mndDropTbForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
if (code) goto _end;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_end:
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
tDecoderClear(&decoder);

View File

@ -31,6 +31,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStatePutParName = streamStatePutParName;
pStore->streamStateGetParName = streamStateGetParName;
pStore->streamStateDeleteParName = streamStateDeleteParName;
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
pStore->streamStateReleaseBuf = streamStateReleaseBuf;

View File

@ -146,7 +146,7 @@ int32_t tqBuildFName(char** data, const char* path, char* name);
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
@ -158,6 +158,7 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
#define TQ_ERR_GO_TO_END(c) \
do { \

View File

@ -1551,7 +1551,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
_resume_delete:
version = RSMA_EXEC_MSG_VER(msg);
if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
&packData.pDataBlock, 1))) {
&packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
taosFreeQitem(msg);
TAOS_CHECK_EXIT(code);
}

View File

@ -758,7 +758,8 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false;
SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
if (pTask->exec.pWalReader == NULL) {
tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));

View File

@ -366,8 +366,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
} else if (pCont->msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
EStreamType blockType = STREAM_DELETE_DATA;
code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
if (code == TSDB_CODE_SUCCESS) {
if (*pItem == NULL) {
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
@ -382,6 +382,20 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
return code;
}
} else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
if (TSDB_CODE_SUCCESS == code) {
if (!*pItem) {
continue;
} else {
tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
}
} else {
terrno = code;
return code;
}
} else {
tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
return TSDB_CODE_STREAM_INTERNAL_ERROR;

View File

@ -53,6 +53,7 @@ static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
int64_t earlyTs);
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule) {
@ -138,7 +139,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
return 0;
}
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
@ -170,17 +171,50 @@ end:
return ret;
}
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) {
int32_t code = 0;
SEncoder ec = {0};
tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code);
if (code < 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;
}
*contLen += sizeof(SMsgHead);
*ppBuf = rpcMallocCont(*contLen);
if (!*ppBuf) {
code = terrno;
goto end;
}
((SMsgHead*)(*ppBuf))->vgId = vgId;
((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen);
tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
code = tEncodeSVDropTbBatchReq(&ec, pReqs);
tEncoderClear(&ec);
if (code < 0) {
rpcFreeCont(*ppBuf);
*ppBuf = NULL;
*contLen = 0;
code = TSDB_CODE_INVALID_MSG;
goto end;
}
end:
return code;
}
static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) {
void* buf = NULL;
int32_t tlen = 0;
int32_t code = encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen);
if (code) {
tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen};
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
if (code) {
tqError("failed to put into write-queue since %s", terrstr());
@ -388,7 +422,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
}
reqs.nReqs = taosArrayGetSize(reqs.pArray);
code = tqPutReqToQueue(pVnode, &reqs);
code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to send create table msg", id);
}
@ -399,6 +433,61 @@ _end:
return code;
}
static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock,
SStreamTask* pTask, int64_t suid) {
int32_t lino = 0;
int32_t code = 0;
int32_t rows = pDataBlock->info.rows;
const char* id = pTask->id.idStr;
SVDropTbBatchReq batchReq = {0};
SVDropTbReq req = {0};
if (rows <= 0 || rows > 1 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS;
batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq));
if (!batchReq.pArray) return terrno;
batchReq.nReqs = rows;
req.suid = suid;
req.igNotExists = true;
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
int32_t i = 0;
void* pData = colDataGetVarData(pTbNameCol, i);
memcpy(tbName, varDataVal(pData), varDataLen(pData));
tbName[varDataLen(pData) + 1] = 0;
req.name = tbName;
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
TSDB_CHECK_CODE(terrno, lino, _exit);
}
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
code = metaGetTableEntryByName(&mr, tbName);
if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
STableSinkInfo* pTableSinkInfo = NULL;
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
if (alreadyCached) {
pTableSinkInfo->uid = mr.me.uid;
}
}
metaReaderClear(&mr);
tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
TSDB_CHECK_CODE(code, lino, _exit);
code = doWaitForDstTableDropped(pVnode, pTask, tbName);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (batchReq.pArray) {
taosArrayDestroy(batchReq.pArray);
}
return code;
}
int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) {
const char* id = pTask->id.idStr;
int32_t vgId = TD_VID(pVnode);
@ -807,6 +896,40 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_SUCCESS;
}
static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName) {
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
while (1) {
if (streamTaskShouldStop(pTask)) {
tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
int32_t code = metaGetTableEntryByName(&mr, dstTableName);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
metaReaderClear(&mr);
break;
} else if (TSDB_CODE_SUCCESS == code) {
if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) {
metaReaderClear(&mr);
taosMsleep(100);
tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName);
} else {
metaReaderClear(&mr);
break;
}
} else {
tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName);
metaReaderClear(&mr);
return terrno;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
int32_t nameLen = strlen(pDstTableName);
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
@ -1032,7 +1155,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
}
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData) {
if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
numOfBlocks);
@ -1052,6 +1175,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else {
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
}

View File

@ -572,7 +572,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
return 0;
}
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
int32_t code = 0;
int32_t line = 0;
SDecoder* pCoder = &(SDecoder){0};
@ -593,7 +593,7 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
}
SSDataBlock* pDelBlock = NULL;
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock);
code = createSpecialDataBlock(blockType, &pDelBlock);
TSDB_CHECK_CODE(code, line, END);
code = blockDataEnsureCapacity(pDelBlock, numOfTables);
@ -751,3 +751,45 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
return TSDB_CODE_SUCCESS;
}
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
int32_t code = 0;
int32_t lino = 0;
SDecoder dc = {0};
SVDropTbBatchReq batchReq = {0};
tDecoderInit(&dc, (uint8_t*)data, len);
code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
TSDB_CHECK_CODE(code, lino, _exit);
if (batchReq.nReqs <= 0) goto _exit;
SSDataBlock* pBlock = NULL;
code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
TSDB_CHECK_CODE(code, lino, _exit);
code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
TSDB_CHECK_CODE(code, lino, _exit);
pBlock->info.rows = batchReq.nReqs;
pBlock->info.version = ver;
for (int32_t i = 0; i < batchReq.nReqs; ++i) {
SVDropTbReq* pReq = batchReq.pReqs + i;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
TSDB_CHECK_CODE(code, lino, _exit);
((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
_exit:
tDecoderClear(&dc);
if (TSDB_CODE_SUCCESS != code) {
tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
blockDataCleanup(pBlock);
taosMemoryFree(pBlock);
}
return code;
}

View File

@ -147,6 +147,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStatePutParName = streamStatePutParName;
pStore->streamStateGetParName = streamStateGetParName;
pStore->streamStateDeleteParName = streamStateDeleteParName;
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
pStore->streamStateReleaseBuf = streamStateReleaseBuf;

View File

@ -50,6 +50,8 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
SRpcMsg *pOriginRpc);
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
@ -481,6 +483,61 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return code;
}
int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t size = 0;
SDecoder dc = {0};
SEncoder ec = {0};
SVDropTbBatchReq receivedBatchReqs = {0};
SVDropTbBatchReq sentBatchReqs = {0};
tDecoderInit(&dc, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead));
code = tDecodeSVDropTbBatchReq(&dc, &receivedBatchReqs);
if (code < 0) {
terrno = code;
TSDB_CHECK_CODE(code, lino, _exit);
}
sentBatchReqs.pArray = taosArrayInit(receivedBatchReqs.nReqs, sizeof(SVDropTbReq));
if (!sentBatchReqs.pArray) {
code = terrno;
goto _exit;
}
for (int32_t i = 0; i < receivedBatchReqs.nReqs; ++i) {
SVDropTbReq* pReq = receivedBatchReqs.pReqs + i;
tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, pReq->name);
if (uid == 0) {
vWarn("vgId:%d, preprocess drop ctb: %s not found", TD_VID(pVnode), pReq->name);
continue;
}
pReq->uid = uid;
vDebug("vgId:%d %s for: %s, uid: %"PRId64, TD_VID(pVnode), __func__, pReq->name, pReq->uid);
if (taosArrayPush(sentBatchReqs.pArray, pReq) == NULL) {
code = terrno;
goto _exit;
}
}
sentBatchReqs.nReqs = sentBatchReqs.pArray->size;
tEncodeSize(tEncodeSVDropTbBatchReq, &sentBatchReqs, size, code);
tEncoderInit(&ec, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), size);
code = tEncodeSVDropTbBatchReq(&ec, &sentBatchReqs);
tEncoderClear(&ec);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d %s failed to encode drop tb batch req: %s", TD_VID(pVnode), __func__, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
tDecoderClear(&dc);
if (sentBatchReqs.pArray) {
taosArrayDestroy(sentBatchReqs.pArray);
}
return code;
}
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
@ -507,6 +564,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_ARB_CHECK_SYNC: {
code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg);
} break;
case TDMT_VND_DROP_TABLE: {
code = vnodePreProcessDropTbMsg(pVnode, pMsg);
} break;
default:
break;
}
@ -1110,7 +1170,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
STbUidStore *pStore = NULL;
SArray *tbUids = NULL;
SArray *tbNames = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
@ -2512,3 +2571,4 @@ _OVER:
int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
#endif

View File

@ -1083,18 +1083,13 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;
if (LIST_LENGTH(pGroupList) == 1) {
SNode* p = nodesListGetNode(pGroupList, 0);
if (!p) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return false;
}
if (p->type == QUERY_NODE_FUNCTION) {
// partition by tbname/group by tbname
bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
SNode*pNode = NULL;
FOREACH(pNode, pGroupList) {
if (pNode->type == QUERY_NODE_FUNCTION) {
bytbname = (strcmp(((struct SFunctionNode*)pNode)->functionName, "tbname") == 0);
break;
}
}
return bytbname;
}

View File

@ -1326,7 +1326,6 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
SSDataBlock* pTmpBlock = NULL;
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);

View File

@ -289,6 +289,7 @@ static int32_t doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* p
pTaskInfo, &pTableScanInfo->metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
if (pTaskInfo->streamInfo.pState) blockDataCleanup(pBlock);
code = 0;
}
}
@ -3038,10 +3039,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = 0;
}
if (code) {
blockDataFreeRes((SSDataBlock*)pBlock);
QUERY_CHECK_CODE(code, lino, _end);
@ -3535,6 +3532,46 @@ static int32_t copyGetResultBlock(SSDataBlock* dest, TSKEY start, TSKEY end) {
return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL);
}
static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t *deleteNum) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
// uid is the same as gid
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pTbnameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
int64_t* gpIdCol = (int64_t*)pGpIdCol->pData;
void* pParName = NULL;
int32_t winCode = 0;
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i],
&pParName, false, &winCode);
if (TSDB_CODE_SUCCESS == code && winCode != 0) {
qDebug("delete stream part Name for:%"PRId64 " not found", gpIdCol[i]);
colDataSetNULL(pTbnameCol, i);
continue;
}
(*deleteNum)++;
QUERY_CHECK_CODE(code, lino, _end);
char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0};
varDataSetLen(varTbName, strlen(pParName));
int64_t len = tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pParName);
code = colDataSetVal(pTbnameCol, i, varTbName, false);
qDebug("delete stream part for:%"PRId64 " res tb: %s", gpIdCol[i], (char*)pParName);
pInfo->stateStore.streamStateFreeVal(pParName);
QUERY_CHECK_CODE(code, lino, _end);
code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.id.groupId = gpIdCol[i];
// currently, only one valid row in pBlock
memcpy(pBlock->info.parTbName, varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// NOTE: this operator does never check if current status is done or not
int32_t code = TSDB_CODE_SUCCESS;
@ -3774,6 +3811,12 @@ FETCH_NEXT_BLOCK:
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
} break;
case STREAM_DROP_CHILD_TABLE: {
int32_t deleteNum = 0;
code = deletePartName(pInfo, pBlock, &deleteNum);
QUERY_CHECK_CODE(code, lino, _end);
if (deleteNum == 0) goto FETCH_NEXT_BLOCK;
} break;
case STREAM_CHECKPOINT: {
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
} break;
@ -3915,7 +3958,13 @@ FETCH_NEXT_BLOCK:
}
code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
QUERY_CHECK_CODE(code, lino, _end);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
pInfo->pRes->info.rows = 0;
code = TSDB_CODE_SUCCESS;
} else {
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->pRes->info.rows == 0) {
continue;
}

View File

@ -5215,7 +5215,7 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) {
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pBlock;
return code;

View File

@ -433,9 +433,6 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = NULL;
code = nodesCloneNode(pQuery->pPrepareRoot, &pQuery->pRoot);
if (NULL == pQuery->pRoot) {
code = code;
}
}
if (TSDB_CODE_SUCCESS == code) {
rewriteExprAlias(pQuery->pRoot);

View File

@ -1534,21 +1534,20 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
pSort->pSortKeys = NULL;
code = nodesCloneList(pSelect->pOrderByList, &pSort->pSortKeys);
if (NULL == pSort->pSortKeys) {
code = code;
}
SNode* pNode = NULL;
SOrderByExprNode* firstSortKey = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0);
if (isPrimaryKeySort(pSelect->pOrderByList)) pSort->node.outputTsOrder = firstSortKey->order;
if (firstSortKey->pExpr->type == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)firstSortKey->pExpr;
int16_t projIdx = 1;
FOREACH(pNode, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp(pCol->node.aliasName, pExpr->aliasName)) {
pCol->projIdx = projIdx; break;
if (NULL != pSort->pSortKeys) {
SNode* pNode = NULL;
SOrderByExprNode* firstSortKey = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0);
if (isPrimaryKeySort(pSelect->pOrderByList)) pSort->node.outputTsOrder = firstSortKey->order;
if (firstSortKey->pExpr->type == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)firstSortKey->pExpr;
int16_t projIdx = 1;
FOREACH(pNode, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp(pCol->node.aliasName, pExpr->aliasName)) {
pCol->projIdx = projIdx; break;
}
projIdx++;
}
projIdx++;
}
}
}

View File

@ -836,11 +836,9 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
nodesDestroyNode(pMergeWin->pTsEnd);
pMergeWin->pTsEnd = NULL;
code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMergeWin->pTsEnd);
if (NULL == pMergeWin->pTsEnd) {
code = code;
}
}
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
if (TSDB_CODE_SUCCESS == code)
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,

View File

@ -223,6 +223,7 @@ int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGro
// parname cf
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId);
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);

View File

@ -4432,6 +4432,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
return code;
}
int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId);
return code;
}
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);

View File

@ -166,6 +166,8 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
return "checkpoint-trigger";
case STREAM_INPUT__TRANS_STATE:
return "trans-state";
case STREAM_INPUT__REF_DATA_BLOCK:
return "ref-block";
default:
return "datablock";
}
@ -211,7 +213,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
// do not merge blocks for sink node and check point data block
int8_t type = qItem->type;
if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) {
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
const char* p = streamQueueItemGetTypeStr(type);
if (*pInput == NULL) {

View File

@ -525,6 +525,18 @@ _end:
return code;
}
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
if (TSDB_CODE_SUCCESS != code) {
qWarn("failed to remove parname from cache, code:%d", code);
}
code = streamStateDeleteParName_rocksdb(pState, groupId);
if (TSDB_CODE_SUCCESS != code) {
qWarn("failed to remove parname from rocksdb, code:%d", code);
}
return TSDB_CODE_SUCCESS;
}
void streamStateDestroy(SStreamState* pState, bool remove) {
streamFileStateDestroy(pState->pFileState);
// streamStateDestroy_rocksdb(pState, remove);

View File

@ -89,6 +89,8 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
TAOS_RETURN(walFetchBody(pReader));
} else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
TAOS_RETURN(walFetchBody(pReader));
} else {
TAOS_CHECK_RETURN(walSkipFetchBody(pReader));

View File

@ -0,0 +1,55 @@
import os
import platform
import subprocess
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame.epath import *
from frame import *
class TDTestCase(TBase):
def apiPath(self):
apiPath = None
currentFilePath = os.path.dirname(os.path.realpath(__file__))
if (os.sep.join(["community", "tests"]) in currentFilePath):
testFilePath = currentFilePath[:currentFilePath.find(os.sep.join(["community", "tests"]))]
else:
testFilePath = currentFilePath[:currentFilePath.find(os.sep.join(["TDengine", "tests"]))]
for root, dirs, files in os.walk(testFilePath):
if ("passwdTest.c" in files):
apiPath = root
break
return apiPath
def run(self):
apiPath = self.apiPath()
tdLog.info(f"api path: {apiPath}")
if platform.system().lower() == 'linux':
p = subprocess.Popen(f"cd {apiPath} && make", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if 0 != p.returncode:
tdLog.exit("Test script passwdTest.c make failed")
p = subprocess.Popen(f"ls {apiPath}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
tdLog.info(f"test files: {out}")
if apiPath:
test_file_cmd = os.sep.join([apiPath, "passwdTest localhost"])
try:
p = subprocess.Popen(test_file_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if 0 != p.returncode:
tdLog.exit("Failed to run passwd test with output: %s \n error: %s" % (out, err))
else:
tdLog.info(out)
tdLog.success(f"{__file__} successfully executed")
except Exception as e:
tdLog.exit(f"Failed to execute {__file__} with error: {e}")
else:
tdLog.exit("passwdTest.c not found")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -44,6 +44,7 @@
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f grant/grantBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/queryBugs.py -N 3
,,n,army,python3 ./test.py -f user/test_passwd.py
,,y,army,./pytest.sh python3 ./test.py -f tmq/tmqBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py
,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py
@ -51,6 +52,7 @@
,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/test_having.py
,,n,army,python3 ./test.py -f tmq/drop_lost_comsumers.py
#
# system test
#

View File

@ -13,7 +13,7 @@ all: $(TARGET)
exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stmt2-test.c -o $(ROOT)stmt2-test $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-test.c -o $(ROOT)stmt2-test $(LFLAGS)
gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS)
gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS)
gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS)
@ -22,11 +22,11 @@ exe:
gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS)
gcc $(CFLAGS) ./tmqViewTest.c -o $(ROOT)tmqViewTest $(LFLAGS)
gcc $(CFLAGS) ./stmtQuery.c -o $(ROOT)stmtQuery $(LFLAGS)
gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
gcc $(CFLAGS) ./stmt2.c -o $(ROOT)stmt2 $(LFLAGS)
gcc $(CFLAGS) ./stmt2-example.c -o $(ROOT)stmt2-example $(LFLAGS)
gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
# gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
# gcc $(CFLAGS) ./stmt2.c -o $(ROOT)stmt2 $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-example.c -o $(ROOT)stmt2-example $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
clean:

View File

@ -0,0 +1,20 @@
# Makefile.mak for win64
TARGET = passwdTest.exe
CC = cl
CFLAGS = /W4 /EHsc /I"C:\TDengine\include" /DWINDOWS
LDFLAGS = /link /LIBPATH:"C:\TDengine\driver" taos.lib
SRCS = passwdTest.c
OBJS = $(SRCS:.c=.obj)
all: $(TARGET)
$(TARGET): $(OBJS)
$(CC) $(OBJS) $(LDFLAGS)
.c.obj:
$(CC) $(CFLAGS) /c $<
clean:
del $(OBJS) $(TARGET)

View File

@ -20,12 +20,27 @@
* passwdTest.c
* - Run the test case in clear TDengine environment with default root passwd 'taosdata'
*/
#ifdef WINDOWS
#include <winsock2.h>
#include <windows.h>
#include <stdint.h>
#ifndef PRId64
#define PRId64 "I64d"
#endif
#ifndef PRIu64
#define PRIu64 "I64u"
#endif
#else
#include <inttypes.h>
#include <unistd.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "taos.h" // TAOS header file
#define nDup 1
@ -50,6 +65,16 @@ void sysInfoTest(TAOS *taos, const char *host, char *qstr);
void userDroppedTest(TAOS *taos, const char *host, char *qstr);
void clearTestEnv(TAOS *taos, const char *host, char *qstr);
void taosMsleep(int64_t ms) {
if (ms < 0) return;
#ifdef WINDOWS
Sleep(ms);
#else
usleep(ms * 1000);
#endif
}
int nPassVerNotified = 0;
int nUserDropped = 0;
TAOS *taosu[nRoot] = {0};
@ -59,7 +84,8 @@ void __taos_notify_cb(void *param, void *ext, int type) {
switch (type) {
case TAOS_NOTIFY_PASSVER: {
++nPassVerNotified;
printf("%s:%d type:%d user:%s passVer:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext);
printf("%s:%d type:%d user:%s passVer:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL",
*(int *)ext);
break;
}
case TAOS_NOTIFY_USER_DROPPED: {
@ -191,11 +217,11 @@ static int printResult(TAOS_RES *res, char *output) {
printRow(temp, row, fields, numFields);
puts(temp);
}
return 0;
}
int main(int argc, char *argv[]) {
char qstr[1024];
// connect to server
if (argc < 2) {
printf("please input server-ip \n");
@ -215,6 +241,7 @@ int main(int argc, char *argv[]) {
taos_close(taos);
taos_cleanup();
exit(EXIT_SUCCESS);
}
void createUsers(TAOS *taos, const char *host, char *qstr) {
@ -234,6 +261,7 @@ void createUsers(TAOS *taos, const char *host, char *qstr) {
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb(TAOS_NOTIFY_PASSVER) for user:%s since %d\n", users[i], code);
exit(EXIT_FAILURE);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb(TAOS_NOTIFY_PASSVER) for user:%s\n", users[i]);
}
@ -260,6 +288,7 @@ void passVerTestMulti(const char *host, char *qstr) {
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb since %d\n", code);
exit(EXIT_FAILURE);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb\n");
}
@ -283,26 +312,25 @@ void passVerTestMulti(const char *host, char *qstr) {
printf("%s:%d [%d] second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i,
nPassVerNotified, nConn);
if (nPassVerNotified >= nConn) break;
sleep(1);
taosMsleep(1000);
}
// close the taos_conn
for (int i = 0; i < nRoot; ++i) {
taos_close(taos[i]);
printf("%s:%d close taos[%d]\n", __func__, __LINE__, i);
// sleep(1);
// taosMsleep(1000);
}
for (int i = 0; i < nUser; ++i) {
taos_close(taosu[i]);
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
// sleep(1);
// taosMsleep(1000);
}
fprintf(stderr, "######## %s #########\n", __func__);
if (nPassVerNotified == nConn) {
fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d == nConn %d\n", nPassVerNotified,
nConn);
fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d == nConn %d\n", nPassVerNotified, nConn);
} else {
fprintf(stderr, ">>> failed to get passVer notification since nNotify %d != nConn %d\n", nPassVerNotified, nConn);
exit(1);
@ -356,7 +384,7 @@ _REP:
fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__);
for (int i = 1; i <= 2; ++i) {
sleep(1);
taosMsleep(1000);
}
res = taos_query(taos[0], qstr);
@ -372,10 +400,10 @@ _REP:
queryDB(taosRoot, "alter user user0 sysinfo 1");
fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__);
for (int i = 1; i <= 2; ++i) {
sleep(1);
taosMsleep(1000);
}
if(++nRep < 5) {
if (++nRep < 5) {
goto _REP;
}
@ -390,7 +418,7 @@ _REP:
fprintf(stderr, "######## %s #########\n", __func__);
}
static bool isDropUser = true;
void userDroppedTest(TAOS *taos, const char *host, char *qstr) {
void userDroppedTest(TAOS *taos, const char *host, char *qstr) {
// users
int nTestUsers = nUser;
int nLoop = 0;
@ -408,6 +436,7 @@ _loop:
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb:%d for user:%s since %d\n", TAOS_NOTIFY_USER_DROPPED, users[i],
code);
exit(EXIT_FAILURE);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb:%d for user:%s\n", TAOS_NOTIFY_USER_DROPPED, users[i]);
}
@ -426,7 +455,7 @@ _loop:
printf("%s:%d [%d] second(s) elasped, user dropped notification received:%d, total:%d\n", __func__, __LINE__, i,
nUserDropped, nConn);
if (nUserDropped >= nConn) break;
sleep(1);
taosMsleep(1000);
}
for (int i = 0; i < nTestUsers; ++i) {

View File

@ -604,7 +604,7 @@ class TSMATestSQLGenerator:
class TDTestCase:
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3}
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3, 'debugFlag': 143}
def __init__(self):
self.vgroups = 4
@ -804,8 +804,8 @@ class TDTestCase:
self.tsma_tester.check_sql(ctx.sql, ctx)
def test_query_with_tsma(self):
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '30m')
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
self.test_query_with_tsma_interval()
@ -1237,6 +1237,40 @@ class TDTestCase:
clust_dnode_nums = len(cluster_dnode_list)
if clust_dnode_nums > 1:
self.test_redistribute_vgroups()
tdSql.execute("drop tsma test.tsma5")
for _ in range(4):
self.test_td_32519()
def test_td_32519(self):
self.create_recursive_tsma('tsma1', 'tsma_r', 'test', '1h', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'])
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1)
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1)
tdSql.execute('DROP TABLE test.t1', queryTimes=1)
self.wait_query_err('desc test.`404e15422d96c8b5de9603c2296681b1`', 10, -2147473917)
self.wait_query_err('desc test.`82b56f091c4346369da0af777c3e580d`', 10, -2147473917)
self.wait_query_err('desc test.`163b7c69922cf6d83a98bfa44e52dade`', 10, -2147473917)
tdSql.execute('CREATE TABLE test.t1 USING test.meters TAGS(1, "a", "b", 1,1,1)')
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:59:00", 3,1,1,1,1,1,1, "a", "a")', queryTimes=1)
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 12:10:00", 4,1,1,1,1,1,1, "a", "a")', queryTimes=1)
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 12:20:00", 5,1,1,1,1,1,1, "a", "a")', queryTimes=1)
tdSql.execute('FLUSH DATABASE test', queryTimes=1)
tdSql.query('SELECT * FROM test.t1', queryTimes=1)
tdSql.checkRows(3)
sql = 'SELECT * FROM test.`404e15422d96c8b5de9603c2296681b1`'
self.wait_query(sql, 3, 20) ## tsma1 output ctb for t1
tdSql.query(sql, queryTimes=1)
tdSql.checkData(0,1, 1)
tdSql.checkData(1,1, 1)
tdSql.checkData(2,1, 1)
#sql = 'select * from test.`82b56f091c4346369da0af777c3e580d`'
#self.wait_query(sql, 2, 10) ## tsma2 output ctb for t1
#tdSql.query(sql, queryTimes=1)
#tdSql.checkData(0, 1, 1)
#tdSql.checkData(1, 1, 2)
sql = 'select * from test.`163b7c69922cf6d83a98bfa44e52dade`'
self.wait_query(sql, 2, 20) ## tsma_r output ctb for t1
tdSql.checkData(0, 1, 1)
self.drop_tsma('tsma_r', 'test')
def test_create_tsma(self):
function_name = sys._getframe().f_code.co_name