Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/ly_query
This commit is contained in:
commit
dfe3d5a9bc
|
@ -148,7 +148,7 @@ T = latest event time - watermark
|
|||
|
||||
The window closing time for each batch of data that arrives at the system is updated using the preceding formula, and all windows are closed whose closing time is less than T. If the triggering method is WINDOW_CLOSE or MAX_DELAY, the aggregate result for the window is pushed.
|
||||
|
||||
Stream processing strategy for expired data
|
||||
## Stream processing strategy for expired data
|
||||
The data in expired windows is tagged as expired. TDengine stream processing provides two methods for handling such data:
|
||||
|
||||
1. Drop the data. This is the default and often only handling method for most stream processing engines.
|
||||
|
@ -157,6 +157,14 @@ The data in expired windows is tagged as expired. TDengine stream processing pro
|
|||
|
||||
In both of these methods, configuring the watermark is essential for obtaining accurate results (if expired data is dropped) and avoiding repeated triggers that affect system performance (if expired data is recalculated).
|
||||
|
||||
## Stream processing strategy for modifying data
|
||||
|
||||
TDengine provides two ways to handle modified data, which are specified by the IGNORE UPDATE option:
|
||||
|
||||
1. Check whether the data has been modified, i.e. IGNORE UPDATE 0, and recalculate the corresponding window if the data has been modified.
|
||||
|
||||
2. Do not check whether the data has been modified, and calculate all the data as incremental data, i.e. IGNORE UPDATE 1, the default configuration.
|
||||
|
||||
## Supported functions
|
||||
|
||||
All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings:
|
||||
|
|
|
@ -201,9 +201,9 @@ TDengine 对于过期数据提供两种处理方式,由 IGNORE EXPIRED 选项
|
|||
|
||||
TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项指定:
|
||||
|
||||
1. 检查数据是否被修改,即 IGNORE UPDATE 0:默认配置,如果被修改,则重新计算对应窗口。
|
||||
1. 检查数据是否被修改,即 IGNORE UPDATE 0,如果数据被修改,则重新计算对应窗口。
|
||||
|
||||
2. 不检查数据是否被修改,全部按增量数据计算,即 IGNORE UPDATE 1。
|
||||
2. 不检查数据是否被修改,全部按增量数据计算,即 IGNORE UPDATE 1,默认配置。
|
||||
|
||||
|
||||
## 写入已存在的超级表
|
||||
|
|
|
@ -857,6 +857,58 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool mndTransActionsOfSameType(SArray *pActions) {
|
||||
int32_t size = taosArrayGetSize(pActions);
|
||||
ETrnAct lastActType = TRANS_ACTION_NULL;
|
||||
bool same = true;
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STransAction *pAction = taosArrayGet(pActions, i);
|
||||
if (i > 0) {
|
||||
if (lastActType != pAction->actionType) {
|
||||
same = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
lastActType = pAction->actionType;
|
||||
}
|
||||
return same;
|
||||
}
|
||||
|
||||
static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) {
|
||||
if (pTrans->exec == TRN_EXEC_PARALLEL) {
|
||||
if (mndTransActionsOfSameType(pTrans->redoActions) == false) {
|
||||
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
|
||||
mError("trans:%d, types of parallel redo actions are not the same", pTrans->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
|
||||
if (mndTransActionsOfSameType(pTrans->undoActions) == false) {
|
||||
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
|
||||
mError("trans:%d, types of parallel undo actions are not the same", pTrans->id);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
|
||||
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
|
||||
mError("trans:%d, commit actions of non-changeless trans are empty", pTrans->id);
|
||||
return -1;
|
||||
}
|
||||
if (mndTransActionsOfSameType(pTrans->commitActions) == false) {
|
||||
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
|
||||
mError("trans:%d, types of commit actions are not the same", pTrans->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
if (pTrans == NULL) return -1;
|
||||
|
||||
|
@ -864,9 +916,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
if (mndTransCheckParallelActions(pMnode, pTrans) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndTransCheckCommitActions(pMnode, pTrans) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1283,24 +1337,25 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
|||
|
||||
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
|
||||
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, topHalf:%d", pTrans->id, terrstr(), terrno,
|
||||
topHalf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to execute undoActions since %s", terrstr());
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
|
||||
mError("trans:%d, failed to execute undoActions since %s. topHalf:%d", pTrans->id, terrstr(), topHalf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to execute commitActions since %s", terrstr());
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
|
||||
mError("trans:%d, failed to execute commitActions since %s. topHalf:%d", pTrans->id, terrstr(), topHalf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
|
||||
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
|
||||
pTq->pVnode->config.tsdbCfg.precision);
|
||||
totalRows += pBlock->info.rows;
|
||||
*totalRows += pBlock->info.rows;
|
||||
blockDataFreeRes(pBlock);
|
||||
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
|
||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||
|
|
|
@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond);
|
|||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
|
||||
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI);
|
||||
|
||||
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
|
||||
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
|
||||
|
||||
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
|
||||
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
|
||||
|
@ -642,7 +642,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
info->groupId = calcGroupId(keyBuf, len);
|
||||
if (initRemainGroups) {
|
||||
// groupId ~ table uid
|
||||
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid));
|
||||
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
|
||||
sizeof(info->uid));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -858,7 +859,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
|
|||
}
|
||||
|
||||
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
||||
SStorageAPI* pStorageAPI) {
|
||||
SStorageAPI* pStorageAPI) {
|
||||
SSDataBlock* pResBlock = createDataBlock();
|
||||
if (pResBlock == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -939,11 +940,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
|
|||
return pResBlock;
|
||||
}
|
||||
|
||||
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) {
|
||||
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
|
||||
bool* pResultList, bool addUid) {
|
||||
taosArrayClear(pUidList);
|
||||
|
||||
STableKeyInfo info = {.uid = 0, .groupId = 0};
|
||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
if (pResultList[i]) {
|
||||
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
|
||||
|
@ -1143,7 +1145,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
|||
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
|
||||
qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
|
||||
} else {
|
||||
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
|
||||
qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1165,7 +1167,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
|||
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
|
||||
}
|
||||
|
||||
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
|
||||
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
|
||||
pPayload, size, 1);
|
||||
digest[0] = 1;
|
||||
memcpy(digest + 1, context.digest, tListLen(context.digest));
|
||||
}
|
||||
|
@ -1725,7 +1728,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
|
|||
return c;
|
||||
}
|
||||
|
||||
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) {
|
||||
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
|
||||
const SReadHandle* readHandle) {
|
||||
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
||||
|
||||
|
@ -1748,8 +1752,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
|
||||
// allowed read stt file optimization mode
|
||||
pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
|
||||
(pTableScanNode->scan.node.pConditions == NULL) &&
|
||||
(pTableScanNode->interval == 0);
|
||||
(pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
|
||||
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
|
@ -1891,7 +1894,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde
|
|||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
int64_t slidingEnd =
|
||||
taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
}
|
||||
|
||||
|
@ -2136,7 +2140,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
if (groupSort && groupByTbname) {
|
||||
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||
} else if (groupByTbname && pScanNode->groupOrderScan){
|
||||
} else if (groupByTbname && pScanNode->groupOrderScan) {
|
||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
|
||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||
|
@ -2147,7 +2151,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
bool initRemainGroups = false;
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
|
||||
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) {
|
||||
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
|
||||
!(groupSort || pScanNode->groupOrderScan)) {
|
||||
initRemainGroups = true;
|
||||
}
|
||||
}
|
||||
|
@ -2271,7 +2276,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
|
|||
}
|
||||
if (qDebugFlag & DEBUG_DEBUG) {
|
||||
char* pBuf = NULL;
|
||||
char flagBuf[64];
|
||||
char flagBuf[64];
|
||||
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
|
||||
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
|
||||
taosMemoryFree(pBuf);
|
||||
|
@ -2280,7 +2285,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
|
|||
|
||||
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
|
||||
|
||||
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
|
||||
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
|
||||
int64_t* ts = (int64_t*)pColData->pData;
|
||||
|
||||
int64_t duration = pWin->ekey - pWin->skey + delta;
|
||||
|
@ -2289,13 +2294,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
|||
ts[4] = pWin->ekey + delta; // window end key
|
||||
}
|
||||
|
||||
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
|
||||
int32_t rowIndex) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
const char* isNull = oldkeyBuf;
|
||||
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
|
||||
|
||||
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
|
||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||
|
||||
|
@ -2321,8 +2327,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
|
||||
int32_t rowIndex) {
|
||||
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
uint32_t colNum = pSortGroupCols->size;
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
char* isNull = keyBuf;
|
||||
|
@ -2370,7 +2375,7 @@ uint64_t calcGroupId(char* pData, int32_t len) {
|
|||
}
|
||||
|
||||
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
||||
SNode* node;
|
||||
SNode* node;
|
||||
SNodeList* ret = NULL;
|
||||
FOREACH(node, pSortKeys) {
|
||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
|
||||
|
@ -2386,6 +2391,6 @@ int32_t extractKeysLen(const SArray* keys) {
|
|||
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
|
||||
len += pCol->bytes;
|
||||
}
|
||||
len += sizeof(int8_t) * keyNum; //null flag
|
||||
len += sizeof(int8_t) * keyNum; // null flag
|
||||
return len;
|
||||
}
|
||||
|
|
|
@ -983,7 +983,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
|||
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
|
||||
tagVarChar = taosMemoryCalloc(1, bufSize + 1);
|
||||
int32_t len = -1;
|
||||
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
|
||||
if (tagLen > 0)
|
||||
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
|
||||
else
|
||||
len = 0;
|
||||
varDataSetLen(tagVarChar, len);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3201,15 +3201,15 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
|
|||
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
prepareBuf(pCtx);
|
||||
|
||||
SWinKey key;
|
||||
SWinKey key = {0};
|
||||
if (pCtx->saveHandle.pBuf == NULL) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
|
||||
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
int64_t skey = *(int64_t*)colDataGetData(pColInfo, rowIndex);
|
||||
|
||||
key.groupId = pSrcBlock->info.id.groupId;
|
||||
key.ts = skey;
|
||||
SColumnInfoData* pColInfo = pCtx->input.pPTS;
|
||||
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
|
||||
}
|
||||
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
key.groupId = pSrcBlock->info.id.groupId;
|
||||
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
|
||||
}
|
||||
|
||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||
|
|
|
@ -486,6 +486,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId,
|
||||
code);
|
||||
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
|
||||
if (pMsg) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
@ -526,6 +527,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
|
||||
if (code) {
|
||||
qError("hb rsp error:%s", tstrerror(code));
|
||||
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
|
@ -1181,7 +1183,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
qMsg.queryId = pJob->queryId;
|
||||
qMsg.taskId = pTask->taskId;
|
||||
qMsg.refId = pJob->refId;
|
||||
qMsg.execId = pTask->execId;
|
||||
qMsg.execId = *(int32_t*)param;
|
||||
|
||||
msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
|
||||
if (msgSize < 0) {
|
||||
|
|
|
@ -371,14 +371,13 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
|
|||
pCtx->roundTotal = pEpSet->numOfEps;
|
||||
}
|
||||
|
||||
|
||||
if (pCtx->roundTimes >= pCtx->roundTotal) {
|
||||
int64_t nowTs = taosGetTimestampMs();
|
||||
int64_t lastTime = nowTs - pCtx->startTs;
|
||||
if (lastTime > tsMaxRetryWaitTime) {
|
||||
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
||||
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
||||
pJob->noMoreRetry = true;
|
||||
pJob->noMoreRetry = true;
|
||||
SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
|
||||
}
|
||||
|
||||
|
@ -418,7 +417,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
|
|||
taosMemoryFreeClear(pTask->msg);
|
||||
pTask->msgLen = 0;
|
||||
pTask->lastMsgType = 0;
|
||||
pTask->childReady = 0;
|
||||
pTask->childReady = 0;
|
||||
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||
}
|
||||
|
||||
|
@ -505,11 +504,11 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i
|
|||
pLevel->taskExecDoneNum = 0;
|
||||
pLevel->taskLaunchedNum = 0;
|
||||
}
|
||||
|
||||
|
||||
SCH_RESET_JOB_LEVEL_IDX(pJob);
|
||||
|
||||
|
||||
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
|
||||
|
||||
|
||||
taosMemoryFreeClear(pData->pData);
|
||||
taosMemoryFreeClear(pData->pEpSet);
|
||||
|
||||
|
@ -627,7 +626,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
|||
pTask->maxRetryTimes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
|
||||
pTask->maxExecTimes++;
|
||||
pTask->maxRetryTimes++;
|
||||
|
@ -862,7 +861,9 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
|
|||
while (nodeInfo) {
|
||||
if (nodeInfo->handle) {
|
||||
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
|
||||
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL);
|
||||
void *pExecId = taosHashGetKey(nodeInfo, NULL);
|
||||
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);
|
||||
|
||||
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
|
||||
} else {
|
||||
SCH_TASK_DLOG("no need to drop task %dth execNode", i);
|
||||
|
@ -901,7 +902,6 @@ int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
|
||||
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
|
||||
SSchTask *pTask = NULL;
|
||||
|
@ -1269,7 +1269,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
|
||||
|
||||
|
||||
void *pIter = taosHashIterate(list, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *pTask = *(SSchTask **)pIter;
|
||||
|
@ -1277,7 +1277,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t
|
|||
SCH_LOCK_TASK(pTask);
|
||||
code = schNotifyTaskOnExecNode(pJob, pTask, type);
|
||||
SCH_UNLOCK_TASK(pTask);
|
||||
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
|
@ -1289,7 +1289,6 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
|
||||
SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
|
||||
}
|
||||
|
|
|
@ -119,7 +119,11 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
|
|||
|
||||
// add ref for task
|
||||
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
ASSERT(p != NULL);
|
||||
if (p == NULL) {
|
||||
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
|
||||
streamTaskGetStatus(pTask)->name);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
||||
|
|
|
@ -380,12 +380,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->hTaskInfo.pTimer != NULL) {
|
||||
taosTmrStop(pTask->hTaskInfo.pTimer);
|
||||
/*bool ret = */taosTmrStop(pTask->hTaskInfo.pTimer);
|
||||
pTask->hTaskInfo.pTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->msgInfo.pTimer != NULL) {
|
||||
taosTmrStop(pTask->msgInfo.pTimer);
|
||||
/*bool ret = */taosTmrStop(pTask->msgInfo.pTimer);
|
||||
pTask->msgInfo.pTimer = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,16 +19,12 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include <uv.h>
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "theap.h"
|
||||
#include "tmisce.h"
|
||||
#include "tmsg.h"
|
||||
#include "transLog.h"
|
||||
#include "transportInt.h"
|
||||
#include "trpc.h"
|
||||
#include "ttrace.h"
|
||||
#include "tutil.h"
|
||||
|
||||
typedef bool (*FilteFunc)(void* arg);
|
||||
|
||||
|
@ -115,9 +111,12 @@ typedef SRpcConnInfo STransHandleInfo;
|
|||
|
||||
// ref mgt handle
|
||||
typedef struct SExHandle {
|
||||
void* handle;
|
||||
int64_t refId;
|
||||
void* pThrd;
|
||||
void* handle;
|
||||
int64_t refId;
|
||||
void* pThrd;
|
||||
queue q;
|
||||
int8_t inited;
|
||||
SRWLatch latch;
|
||||
} SExHandle;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -92,6 +92,7 @@ typedef struct SCliMsg {
|
|||
int64_t refId;
|
||||
uint64_t st;
|
||||
int sent; //(0: no send, 1: alread sent)
|
||||
queue seqq;
|
||||
} SCliMsg;
|
||||
|
||||
typedef struct SCliThrd {
|
||||
|
@ -121,11 +122,7 @@ typedef struct SCliThrd {
|
|||
SHashObj* batchCache;
|
||||
|
||||
SCliMsg* stopMsg;
|
||||
|
||||
bool quit;
|
||||
|
||||
int newConnCount;
|
||||
SHashObj* msgCount;
|
||||
bool quit;
|
||||
} SCliThrd;
|
||||
|
||||
typedef struct SCliObj {
|
||||
|
@ -262,10 +259,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
|||
} \
|
||||
if (i == sz) { \
|
||||
pMsg = NULL; \
|
||||
tDebug("msg not found, %" PRIu64 "", ahandle); \
|
||||
} else { \
|
||||
pMsg = transQueueRm(&conn->cliMsgs, i); \
|
||||
tDebug("msg found, %" PRIu64 "", ahandle); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -343,6 +338,34 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
|
|||
_RETURN:
|
||||
return false;
|
||||
}
|
||||
bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
|
||||
if (refId == 0) return false;
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||
if (exh == NULL) {
|
||||
tDebug("release conn %p, refId: %" PRId64 "", conn, refId);
|
||||
return false;
|
||||
}
|
||||
taosWLockLatch(&exh->latch);
|
||||
if (exh->handle == NULL) exh->handle = conn;
|
||||
exh->inited = 1;
|
||||
if (!QUEUE_IS_EMPTY(&exh->q)) {
|
||||
queue* h = QUEUE_HEAD(&exh->q);
|
||||
QUEUE_REMOVE(h);
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq);
|
||||
transCtxMerge(&conn->ctx, &t->ctx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, t);
|
||||
tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
|
||||
transReleaseExHandle(transGetRefMgt(), refId);
|
||||
cliSend(conn);
|
||||
return true;
|
||||
}
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
tDebug("empty conn %p, refId: %" PRId64 "", conn, refId);
|
||||
transReleaseExHandle(transGetRefMgt(), refId);
|
||||
return false;
|
||||
}
|
||||
|
||||
void cliHandleResp(SCliConn* conn) {
|
||||
SCliThrd* pThrd = conn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
@ -439,8 +462,14 @@ void cliHandleResp(SCliConn* conn) {
|
|||
return;
|
||||
}
|
||||
}
|
||||
int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle));
|
||||
tDebug("conn %p msg refId: %" PRId64 "", conn, refId);
|
||||
destroyCmsg(pMsg);
|
||||
|
||||
if (cliConnSendSeqMsg(refId, conn)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (cliMaySendCachedMsg(conn) == true) {
|
||||
return;
|
||||
}
|
||||
|
@ -451,6 +480,21 @@ void cliHandleResp(SCliConn* conn) {
|
|||
|
||||
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||
}
|
||||
static void cliDestroyMsgInExhandle(int64_t refId) {
|
||||
if (refId == 0) return;
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||
if (exh) {
|
||||
taosWLockLatch(&exh->latch);
|
||||
while (!QUEUE_IS_EMPTY(&exh->q)) {
|
||||
queue* h = QUEUE_HEAD(&exh->q);
|
||||
QUEUE_REMOVE(h);
|
||||
SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq);
|
||||
destroyCmsg(t);
|
||||
}
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
transReleaseExHandle(transGetRefMgt(), refId);
|
||||
}
|
||||
}
|
||||
|
||||
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
|
||||
if (transQueueEmpty(&pConn->cliMsgs)) {
|
||||
|
@ -510,6 +554,8 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
|
|||
}
|
||||
|
||||
if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
|
||||
int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle));
|
||||
cliDestroyMsgInExhandle(refId);
|
||||
if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -678,7 +724,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
|||
}
|
||||
list->numOfConn++;
|
||||
}
|
||||
tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum);
|
||||
tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -742,13 +788,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
QUEUE_PUSH(&conn->list->conns, &conn->q);
|
||||
conn->list->size += 1;
|
||||
|
||||
if (conn->list->size >= 20) {
|
||||
if (conn->list->size >= 10) {
|
||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||
arg->param1 = conn;
|
||||
arg->param2 = thrd;
|
||||
|
||||
STrans* pTransInst = thrd->pTransInst;
|
||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||
}
|
||||
}
|
||||
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
||||
|
@ -761,8 +807,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
|
|||
exh->handle = conn;
|
||||
exh->pThrd = conn->hostThrd;
|
||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||
conn->refId = exh->refId;
|
||||
QUEUE_INIT(&exh->q);
|
||||
taosInitRWLatch(&exh->latch);
|
||||
|
||||
conn->refId = exh->refId;
|
||||
if (conn->refId == -1) {
|
||||
taosMemoryFree(exh);
|
||||
}
|
||||
|
@ -779,9 +827,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
|
|||
if (exh == NULL) {
|
||||
return -1;
|
||||
}
|
||||
taosWLockLatch(&exh->latch);
|
||||
exh->handle = conn;
|
||||
exh->pThrd = conn->hostThrd;
|
||||
conn->refId = exh->refId;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
|
||||
transReleaseExHandle(transGetRefMgt(), handle);
|
||||
return 0;
|
||||
|
@ -882,7 +932,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
}
|
||||
|
||||
conn->list = NULL;
|
||||
pThrd->newConnCount--;
|
||||
|
||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
|
@ -1190,7 +1239,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
|||
addr.sin_port = (uint16_t)htons(pList->port);
|
||||
|
||||
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
|
||||
pThrd->newConnCount++;
|
||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
|
||||
if (fd == -1) {
|
||||
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
|
||||
|
@ -1392,7 +1440,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
return;
|
||||
}
|
||||
|
||||
taosRLockLatch(&exh->latch);
|
||||
SCliConn* conn = exh->handle;
|
||||
taosRUnLockLatch(&exh->latch);
|
||||
|
||||
transReleaseExHandle(transGetRefMgt(), refId);
|
||||
tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
|
||||
|
||||
|
@ -1425,7 +1476,9 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr)
|
|||
*ignore = true;
|
||||
return NULL;
|
||||
} else {
|
||||
taosRLockLatch(&exh->latch);
|
||||
conn = exh->handle;
|
||||
taosRUnLockLatch(&exh->latch);
|
||||
if (conn == NULL) {
|
||||
conn = getConnFromPool2(pThrd, addr, pMsg);
|
||||
if (conn != NULL) specifyConnRef(conn, true, refId);
|
||||
|
@ -1439,7 +1492,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr)
|
|||
if (conn != NULL) {
|
||||
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
|
||||
} else {
|
||||
tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
|
||||
tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
@ -1598,7 +1651,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
addr.sin_port = (uint16_t)htons(port);
|
||||
|
||||
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
|
||||
pThrd->newConnCount++;
|
||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
|
||||
if (fd == -1) {
|
||||
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
|
||||
|
@ -1858,9 +1910,10 @@ void cliIteraConnMsgs(SCliConn* conn) {
|
|||
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
||||
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||
uint64_t ahandle = pHead->ahandle;
|
||||
tDebug("ahandle = %" PRIu64 "", ahandle);
|
||||
SCliMsg* pMsg = NULL;
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
|
||||
tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn,
|
||||
conn->refId);
|
||||
|
||||
transClearBuffer(&conn->readBuf);
|
||||
transFreeMsg(transContFromHead((char*)pHead));
|
||||
|
@ -1869,6 +1922,9 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
|||
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
|
||||
if (cliMsg->type == Release) {
|
||||
ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
|
||||
tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn,
|
||||
conn->refId);
|
||||
cliDestroyConn(conn, true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1984,11 +2040,9 @@ static SCliThrd* createThrdObj(void* trans) {
|
|||
taosMemoryFree(pThrd);
|
||||
return NULL;
|
||||
}
|
||||
if (pTransInst->supportBatch) {
|
||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
|
||||
} else {
|
||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
|
||||
}
|
||||
int32_t nSync = pTransInst->supportBatch ? 4 : 8;
|
||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb);
|
||||
|
||||
if (pThrd->asyncPool == NULL) {
|
||||
tError("failed to init async pool");
|
||||
uv_loop_close(pThrd->loop);
|
||||
|
@ -2029,8 +2083,6 @@ static SCliThrd* createThrdObj(void* trans) {
|
|||
|
||||
pThrd->quit = false;
|
||||
|
||||
pThrd->newConnCount = 0;
|
||||
pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
return pThrd;
|
||||
}
|
||||
static void destroyThrdObj(SCliThrd* pThrd) {
|
||||
|
@ -2076,7 +2128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
|
||||
}
|
||||
taosHashCleanup(pThrd->batchCache);
|
||||
taosHashCleanup(pThrd->msgCount);
|
||||
taosMemoryFree(pThrd);
|
||||
}
|
||||
|
||||
|
@ -2095,14 +2146,7 @@ void cliSendQuit(SCliThrd* thrd) {
|
|||
void cliWalkCb(uv_handle_t* handle, void* arg) {
|
||||
if (!uv_is_closing(handle)) {
|
||||
if (uv_handle_get_type(handle) == UV_TIMER) {
|
||||
// SCliConn* pConn = handle->data;
|
||||
// if (pConn != NULL && pConn->timer != NULL) {
|
||||
// SCliThrd* pThrd = pConn->hostThrd;
|
||||
// uv_timer_stop((uv_timer_t*)handle);
|
||||
// handle->data = NULL;
|
||||
// taosArrayPush(pThrd->timerList, &pConn->timer);
|
||||
// pConn->timer = NULL;
|
||||
// }
|
||||
// do nothing
|
||||
} else {
|
||||
uv_read_stop((uv_stream_t*)handle);
|
||||
}
|
||||
|
@ -2137,18 +2181,23 @@ static void doCloseIdleConn(void* param) {
|
|||
cliDestroyConn(conn, true);
|
||||
taosMemoryFree(arg);
|
||||
}
|
||||
static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
|
||||
if (!(rpcDebugFlag & DEBUG_DEBUG)) {
|
||||
return;
|
||||
}
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
char tbuf[512] = {0};
|
||||
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
||||
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
|
||||
pCtx->retryNextInterval);
|
||||
return;
|
||||
}
|
||||
|
||||
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
||||
if (rpcDebugFlag & DEBUG_DEBUG) {
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
char tbuf[512] = {0};
|
||||
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
||||
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
|
||||
pCtx->retryStep, pCtx->retryNextInterval);
|
||||
}
|
||||
cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst));
|
||||
|
||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||
arg->param1 = pMsg;
|
||||
|
@ -2157,12 +2206,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
|
||||
}
|
||||
|
||||
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||
if (*val != exp) {
|
||||
*val = newVal;
|
||||
}
|
||||
}
|
||||
|
||||
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
||||
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
|
||||
return false;
|
||||
|
@ -2504,21 +2547,7 @@ int transReleaseCliHandle(void* handle) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
|
||||
if (pThrd == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
||||
static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
epsetAssign(&pCtx->epSet, pEpSet);
|
||||
|
@ -2535,12 +2564,48 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
|||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
cliMsg->refId = (int64_t)shandle;
|
||||
QUEUE_INIT(&cliMsg->seqq);
|
||||
return cliMsg;
|
||||
}
|
||||
|
||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
||||
int64_t handle = (int64_t)pReq->info.handle;
|
||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
|
||||
if (pThrd == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
if (handle != 0) {
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
||||
if (exh != NULL) {
|
||||
taosWLockLatch(&exh->latch);
|
||||
if (exh->handle == NULL && exh->inited != 0) {
|
||||
SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx);
|
||||
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
tDebug("msg refId: %" PRId64 "", handle);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return 0;
|
||||
}
|
||||
exh->inited = 1;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
transReleaseExHandle(transGetRefMgt(), handle);
|
||||
}
|
||||
}
|
||||
SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx);
|
||||
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||
if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
|
||||
destroyCmsg(cliMsg);
|
||||
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
|
||||
if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) {
|
||||
destroyCmsg(pCliMsg);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
@ -2726,6 +2791,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
|||
int64_t transAllocHandle() {
|
||||
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||
QUEUE_INIT(&exh->q);
|
||||
taosInitRWLatch(&exh->latch);
|
||||
tDebug("pre alloc refId %" PRId64 "", exh->refId);
|
||||
|
||||
return exh->refId;
|
||||
|
|
|
@ -761,9 +761,12 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
|||
tTrace("conn %p received release request", pConn);
|
||||
|
||||
STraceId traceId = pHead->traceId;
|
||||
pConn->status = ConnRelease;
|
||||
transClearBuffer(&pConn->readBuf);
|
||||
transFreeMsg(transContFromHead((char*)pHead));
|
||||
if (pConn->status != ConnAcquire) {
|
||||
return true;
|
||||
}
|
||||
pConn->status = ConnRelease;
|
||||
|
||||
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
|
||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
|
@ -1090,6 +1093,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
|||
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
pConn->refId = exh->refId;
|
||||
QUEUE_INIT(&exh->q);
|
||||
transRefSrvHandle(pConn);
|
||||
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
|
||||
return pConn;
|
||||
|
@ -1121,6 +1125,7 @@ static int reallocConnRef(SSvrConn* conn) {
|
|||
exh->handle = conn;
|
||||
exh->pThrd = conn->hostThrd;
|
||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||
QUEUE_INIT(&exh->q);
|
||||
transAcquireExHandle(transGetRefMgt(), exh->refId);
|
||||
conn->refId = exh->refId;
|
||||
|
||||
|
|
|
@ -285,7 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_IN_DROPPING, "Dnode in dropping sta
|
|||
// mnode-trans
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid transaction stage")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")
|
||||
|
|
|
@ -15,6 +15,8 @@ sql use test3;
|
|||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791213000,2,2,3,1.1);
|
||||
sql insert into t1 values(1648791215000,3,2,3,1.1);
|
||||
|
@ -214,4 +216,232 @@ if $data[29][1] != 2 then
|
|||
goto loop11
|
||||
endi
|
||||
|
||||
print step2=============
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql use test4;
|
||||
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams4 trigger at_once ignore expired 0 ignore update 0 into streamt4 as select _wstart, first(a), b, c, ta, tb from st interval(1s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791213000,2,3,4,1.1);
|
||||
sql insert into t2 values(1648791215000,3,4,5,1.1);
|
||||
sql insert into t2 values(1648791217000,4,5,6,1.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop12:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from streamt4 order by 1;
|
||||
sql select * from streamt4 order by 1;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data02 != 2 then
|
||||
print ======data02=$data02
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data03 != 3 then
|
||||
print ======data03=$data03
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data04 != 1 then
|
||||
print ======data04=$data04
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data05 != 1 then
|
||||
print ======data05=$data05
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
|
||||
if $data22 != 4 then
|
||||
print ======data22=$data22
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data23 != 5 then
|
||||
print ======data23=$data23
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data24 != 2 then
|
||||
print ======data24=$data24
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data25 != 2 then
|
||||
print ======data25=$data25
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
print step3=============
|
||||
|
||||
sql create database test5 vgroups 4;
|
||||
sql use test5;
|
||||
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams5 trigger at_once ignore expired 0 ignore update 0 into streamt5 as select _wstart, b, c, ta, tb, max(b) from t1 interval(1s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791213000,2,3,4,1.1);
|
||||
sql insert into t1 values(1648791215000,3,4,5,1.1);
|
||||
sql insert into t1 values(1648791217000,4,5,6,1.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop13:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from streamt5 order by 1;
|
||||
sql select * from streamt5 order by 1;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print ======data02=$data02
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data02 != 3 then
|
||||
print ======data03=$data03
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
print ======data04=$data04
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data04 != 1 then
|
||||
print ======data05=$data05
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
|
||||
if $data21 != 4 then
|
||||
print ======data22=$data22
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data22 != 5 then
|
||||
print ======data23=$data23
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data23 != 1 then
|
||||
print ======data24=$data24
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data24 != 1 then
|
||||
print ======data25=$data25
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
print step4=============
|
||||
|
||||
sql create database test6 vgroups 4;
|
||||
sql use test6;
|
||||
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791213000,2,3,4,1.1);
|
||||
sql insert into t2 values(1648791215000,3,4,5,1.1);
|
||||
sql insert into t2 values(1648791217000,4,5,6,1.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop14:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from streamt6 order by 1;
|
||||
sql select * from streamt6 order by 1;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print ======data02=$data02
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data02 != 3 then
|
||||
print ======data03=$data03
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data04 != 1 then
|
||||
print ======data04=$data04
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data05 != 1 then
|
||||
print ======data05=$data05
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
|
||||
if $data21 != 4 then
|
||||
print ======data22=$data22
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data22 != 5 then
|
||||
print ======data23=$data23
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data24 != 2 then
|
||||
print ======data24=$data24
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data25 != 2 then
|
||||
print ======data25=$data25
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -50,10 +50,11 @@ class TDTestCase:
|
|||
self.tbnum = 20
|
||||
self.rowNum = 10
|
||||
self.tag_dict = {
|
||||
't0':'int'
|
||||
't0':'int',
|
||||
't1':f'nchar({self.nchar_length})'
|
||||
}
|
||||
self.tag_values = [
|
||||
f'1'
|
||||
f'1', '""'
|
||||
]
|
||||
self.binary_str = 'taosdata'
|
||||
self.nchar_str = '涛思数据'
|
||||
|
@ -72,7 +73,7 @@ class TDTestCase:
|
|||
tdSql.execute(f'use {self.dbname}')
|
||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
||||
for i in range(self.tbnum):
|
||||
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")
|
||||
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]}, {self.tag_values[1]})")
|
||||
self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum)
|
||||
def count_check(self):
|
||||
tdSql.query('select count(*) from information_schema.ins_tables')
|
||||
|
@ -313,6 +314,11 @@ class TDTestCase:
|
|||
tdSql.error('alter cluster "activeCode" ""')
|
||||
tdSql.execute('alter cluster "activeCode" "revoked"')
|
||||
|
||||
def test_query_ins_tags(self):
|
||||
sql = f'select tag_name, tag_value from information_schema.ins_tags where table_name = "{self.stbname}_0"'
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(2)
|
||||
|
||||
def run(self):
|
||||
self.prepare_data()
|
||||
self.count_check()
|
||||
|
@ -322,6 +328,7 @@ class TDTestCase:
|
|||
self.ins_stable_check2()
|
||||
self.ins_dnodes_check()
|
||||
self.ins_grants_check()
|
||||
self.test_query_ins_tags()
|
||||
|
||||
|
||||
def stop(self):
|
||||
|
|
Loading…
Reference in New Issue