Merge branch '3.0' into fix/TD-17511

This commit is contained in:
Ganlin Zhao 2022-07-19 16:48:58 +08:00
commit dea45f6cfb
22 changed files with 134 additions and 339 deletions

View File

@ -162,9 +162,12 @@ typedef struct SRequestConnInfo {
SEpSet mgmtEps;
} SRequestConnInfo;
typedef void (*__freeFunc)(void *param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; // async callback function
STargetInfo target; // for update epset
__freeFunc paramFreeFp;
void* param;
uint64_t requestId;
uint64_t requestObjRefId;
@ -188,6 +191,8 @@ int32_t cleanupTaskQueue();
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* ctx);

View File

@ -286,13 +286,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (pInst == NULL || NULL == *pInst) {
taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key);
taosMemoryFreeClear(param);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
taosMemoryFreeClear(param);
if (code != 0) {
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
@ -716,6 +713,7 @@ static void *hbThreadFunc(void *param) {
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = strdup(pAppHbMgr->key);
pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;

View File

@ -29,7 +29,6 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) {
@ -1215,13 +1214,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
return pMsgSendInfo;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
if (NULL == pEpSet) {
return;

View File

@ -255,6 +255,8 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
@ -278,6 +280,8 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
SExecResult* pRes = &pRequest->body.resInfo.execRes;
@ -387,6 +391,8 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
tFreeSShowVariablesRsp(&rsp);
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {

View File

@ -1217,6 +1217,9 @@ static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *c
kv->value = value;
kv->length = valueLen;
if (isTag) {
if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
kv->type = TSDB_DATA_TYPE_NCHAR;
} else {
int32_t ret = smlParseValue(kv, msg);

View File

@ -504,6 +504,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg

View File

@ -556,7 +556,7 @@ static void taosSetSystemCfg(SConfig *pCfg) {
osSetSystemLocale(locale, charset);
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
taosSetConsoleEcho(enableCore);
taosSetCoreDump(enableCore);
// todo
tsVersion = 30000000;
@ -675,7 +675,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
case 'e': {
if (strcasecmp("enableCoreFile", name) == 0) {
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
taosSetConsoleEcho(enableCore);
taosSetCoreDump(enableCore);
}
break;
}

View File

@ -153,23 +153,22 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
return terrno;
}
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
if (insertFp != NULL) {
code = (*insertFp)(pSdb, pRow->pObj);
if (code != 0) {
code = terrno;
taosThreadRwlockWrlock(pLock);
taosHashRemove(hash, pRow->pObj, keySize);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = code;
taosThreadRwlockUnlock(pLock);
return terrno;
}
}
taosThreadRwlockUnlock(pLock);
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
@ -194,7 +193,6 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update");
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
@ -202,6 +200,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
}
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pNewRow, false);
pSdb->tableVer[pOldRow->type]++;

View File

@ -1453,38 +1453,71 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
(pBlock->minVersion <= pVerRange->maxVer);
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL) {
return false;
}
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
// ts is not overlap
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
return false;
}
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
// version is not overlap
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
if (p->version >= pBlock->minVersion) {
return true;
}
} else if (p->ts > pBlock->maxKey.ts) {
} else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts
if (p->version >= pBlock->minVersion) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (i + 1 == num - 1) { // pnext is the last point
if (pnext->ts >= pBlock->minKey.ts) {
return true;
}
} else {
if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) {
return true;
}
}
} else { // it must be the last point
ASSERT(p->version == 0);
}
}
} else { // (p->ts > pBlock->maxKey.ts) {
return false;
}
}
ASSERT(0);
return false;
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL) {
return false;
}
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
return false;
}
// version is not overlap
if (ASCENDING_TRAVERSE(order)) {
return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
} else {
int32_t index = pBlockScanInfo->fileDelIndex;
while(1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pBlock->minKey.ts && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
break;
}
}
return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
}
}
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other

View File

@ -532,6 +532,14 @@ typedef struct SCtgOperation {
} \
} while (0)
#define CTG_API_JENTER() do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_ERR_JRET(TSDB_CODE_CTG_OUT_OF_SERVICE); \
} \
} while (0)
#define CTG_API_LEAVE_NOLOCK(c) do { \
int32_t __code = c; \

View File

@ -244,10 +244,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0;
SCtgJob* pJob = NULL;
CTG_API_ENTER();
CTG_API_JENTER();
SCtgJob* pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
if (NULL == pJob) {
qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
goto _return;
@ -266,8 +267,6 @@ _return:
if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
}
taosMemoryFree(param);
CTG_API_LEAVE(code);
}
@ -293,6 +292,7 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg
param->taskId = pTask->taskId;
msgSendInfo->param = param;
msgSendInfo->paramFreeFp = taosMemoryFree;
msgSendInfo->fp = ctgHandleMsgCallback;
*pMsgSendInfo = msgSendInfo;

View File

@ -91,7 +91,6 @@ _return:
tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
taosMemoryFree(param);
return TSDB_CODE_SUCCESS;
}
@ -110,6 +109,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
pParam->pInserter = pInserter;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;

View File

@ -1994,16 +1994,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&pExchangeInfo->ready);
taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
taosMemoryFree(pWrapper);
return TSDB_CODE_SUCCESS;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
@ -2063,6 +2056,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = pSource->fetchMsgType;

View File

@ -1196,7 +1196,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
if (pResCol->info.colId == pColMatchInfo->colId) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
@ -1435,6 +1434,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
}
qDebug("scan rows: %d", pBlockInfo->rows);
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
@ -1507,8 +1507,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
goto _error;
}
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
pInfo->pTagCond = pTagCond;

View File

@ -1923,15 +1923,18 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond);
}
bool needOutput = false;
switch (classifyCondition(*pCondition)) {
case COND_TYPE_PRIMARY_KEY:
if (NULL != pPrimaryKeyCond) {
*pPrimaryKeyCond = *pCondition;
needOutput = true;
}
break;
case COND_TYPE_TAG_INDEX:
if (NULL != pTagIndexCond) {
*pTagIndexCond = *pCondition;
needOutput = true;
}
if (NULL != pTagCond) {
SNode* pTempCond = *pCondition;
@ -1942,21 +1945,26 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
}
}
*pTagCond = pTempCond;
needOutput = true;
}
break;
case COND_TYPE_TAG:
if (NULL != pTagCond) {
*pTagCond = *pCondition;
needOutput = true;
}
break;
case COND_TYPE_NORMAL:
default:
if (NULL != pOtherCond) {
*pOtherCond = *pCondition;
needOutput = true;
}
break;
}
*pCondition = NULL;
if (needOutput) {
*pCondition = NULL;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -138,6 +138,16 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0;
}
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
if (pMsgBody->paramFreeFp) {
(*pMsgBody->paramFreeFp)(pMsgBody->param);
}
taosMemoryFreeClear(pMsgBody);
}
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);

View File

@ -277,7 +277,7 @@ typedef struct SSchJob {
bool fetched;
int32_t resNumOfRows;
SSchResInfo userRes;
const char *sql;
char *sql;
SQueryProfileSummary summary;
} SSchJob;
@ -461,7 +461,6 @@ int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);

View File

@ -675,6 +675,7 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes);
taosMemoryFreeClear(pJob->sql);
taosMemoryFree(pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
@ -718,7 +719,9 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql;
if (pReq->sql) {
pJob->sql = strdup(pReq->sql);
}
pJob->pDag = pReq->pDag;
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;

View File

@ -386,7 +386,6 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
schProcessOnCbEnd(pJob, pTask, code);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param);
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
tstrerror(rspCode));
@ -398,7 +397,6 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code);
taosMemoryFreeClear(param);
return TSDB_CODE_SUCCESS;
}
@ -447,8 +445,8 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
taosMemoryFree(pMsg->pData);
SCH_RET(code);
}
@ -514,7 +512,9 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msgSendInfo->paramFreeFp = taosMemoryFree;
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
if (pJob) {
@ -535,7 +535,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
_return:
schFreeSMsgSendInfo(msgSendInfo);
destroySendMsgInfo(msgSendInfo);
SCH_RET(code);
}
@ -676,6 +676,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
param->pTrans = pJob->conn.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
@ -795,6 +796,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
pDst->paramFreeFp = taosMemoryFree;
*dst = pDst;
@ -861,8 +863,7 @@ _return:
}
if (pMsgSendInfo) {
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
destroySendMsgInfo(pMsgSendInfo);
}
SCH_RET(code);

View File

@ -50,6 +50,12 @@ char* schGetOpStr(SCH_OP_TYPE type) {
}
}
void schFreeHbTrans(SSchHbTrans *pTrans) {
rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT);
schFreeRpcCtx(&pTrans->rpcCtx);
}
void schCleanClusterHb(void* pTrans) {
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
@ -57,7 +63,7 @@ void schCleanClusterHb(void* pTrans) {
while (hb) {
if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
}
@ -68,8 +74,6 @@ void schCleanClusterHb(void* pTrans) {
}
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY
int32_t code = 0;
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
@ -82,7 +86,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
int64_t taskNum = atomic_load_64(&hb->taskNum);
if (taskNum <= 0) {
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
@ -265,9 +269,7 @@ void schFreeRpcCtxVal(const void *arg) {
}
SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo->msgInfo.pData);
taosMemoryFreeClear(pMsgSendInfo);
destroySendMsgInfo(pMsgSendInfo);
}
void schFreeRpcCtx(SRpcCtx *pCtx) {
@ -290,15 +292,6 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
}
}
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo) {
if (NULL == msgSendInfo) {
return;
}
taosMemoryFree(msgSendInfo->param);
taosMemoryFree(msgSendInfo);
}
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {

View File

@ -120,15 +120,18 @@ void syncRespClean(SSyncRespMgr *pObj) {
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0;
int sum = 0;
SSyncNode *pSyncNode = pObj->data;
SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
ASSERT(delIndexArray != NULL);
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
while (pStub) {
size_t len;
void * key = taosHashGetKey(pStub, &len);
void *key = taosHashGetKey(pStub, &len);
uint64_t *pSeqNum = (uint64_t *)key;
sum++;
int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl) {
@ -155,7 +158,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
}
int32_t arraySize = taosArrayGetSize(delIndexArray);
sDebug("vgId:%d, resp mgr clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
for (int32_t i = 0; i < arraySize; ++i) {
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);

View File

@ -1,260 +0,0 @@
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 1
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 1200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables
# paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
# queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("run select sql from db")
tdSql.query(queryString)
expectrowcnt = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 5000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1()
# self.tmqCase2()
# self.prepareTestEnv()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1
# self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())