Merge branch '3.3.6' into merge/3.0to3.3.6

This commit is contained in:
Simon Guan 2025-03-24 13:46:30 +08:00
commit 72081c980e
9 changed files with 143 additions and 33 deletions

View File

@ -79,10 +79,12 @@ typedef struct SStreamTaskNodeUpdateMsg {
int64_t streamId;
int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo>
SArray* pTaskList; // SArray<int32_t>, taskId list
} SStreamTaskNodeUpdateMsg;
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg);
typedef struct {
int64_t reqId;

View File

@ -522,6 +522,7 @@ typedef struct STaskStartInfo {
typedef struct STaskUpdateInfo {
SHashObj* pTasks;
SArray* pTaskList;
int32_t activeTransId;
int32_t completeTransId;
int64_t completeTs;

View File

@ -1853,6 +1853,9 @@ int32_t doProcessMsgFromServer(void* param) {
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int32_t code = 0;
SEpSet* tEpSet = NULL;
tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
if (pEpSet != NULL) {
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
if (NULL == tEpSet) {
@ -1894,6 +1897,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
goto _exit;
}
return;
_exit:
tscError("failed to sched msg to tsc since %s", tstrerror(code));
code = doProcessMsgFromServerImpl(pMsg, tEpSet);

View File

@ -143,6 +143,18 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
// todo this new attribute will be result in being incompatible with previous version
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
if (pId == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
}
tEndEncode(pEncoder);
_exit:
if (code) {
@ -162,10 +174,10 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
int32_t size = 0;
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
if (pMsg->pNodeList == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo info = {0};
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
@ -179,11 +191,33 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
// number of tasks
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
if (pMsg->pTaskList == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < size; ++i) {
int32_t id = 0;
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
tEndDecode(pDecoder);
_exit:
return code;
}
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
taosArrayDestroy(pMsg->pNodeList);
taosArrayDestroy(pMsg->pTaskList);
pMsg->pNodeList = NULL;
pMsg->pTaskList = NULL;
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
int32_t code = 0;
int32_t lino;

View File

@ -145,7 +145,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
return 0;
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SArray* pTaskList, SStreamTaskId *pId,
int32_t transId) {
int32_t code = 0;
@ -158,6 +158,8 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang
code = terrno;
}
pMsg->pTaskList = pTaskList;
if (code == 0) {
void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
if (p == NULL) {
@ -166,10 +168,10 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang
}
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, SArray* pList, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
initNodeUpdateMsg(&req, pInfo, pList, pId, transId);
int32_t code = 0;
int32_t blen;
@ -177,7 +179,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
tDestroyNodeUpdateMsg(&req);
return terrno;
}
@ -185,7 +187,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
taosArrayDestroy(req.pNodeList);
tDestroyNodeUpdateMsg(&req);
return terrno;
}
@ -196,7 +198,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
if (code == -1) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
taosArrayDestroy(req.pNodeList);
tDestroyNodeUpdateMsg(&req);
return code;
}
@ -209,7 +211,27 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
tDestroyNodeUpdateMsg(&req);
return TSDB_CODE_SUCCESS;
}
// todo: set the task id list for a given nodeId
static int32_t createUpdateTaskList(int32_t vgId, SArray* pList) {
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pe->nodeId == vgId) {
void *pRet = taosArrayPush(pList, &pe->id.taskId);
if (pRet == NULL) {
return terrno;
}
}
}
return TSDB_CODE_SUCCESS;
}
@ -218,11 +240,23 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask
int32_t len = 0;
SEpSet epset = {0};
bool hasEpset = false;
SArray* pTaskList = taosArrayInit(4, sizeof(int32_t));
if (pTaskList == NULL) {
return terrno;
}
bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
int32_t code = createUpdateTaskList(pTask->info.nodeId, pTaskList);
if (code != 0) {
taosArrayDestroy(pTaskList);
return code;
}
// pTaskList already freed here
code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTaskList,pTask->info.nodeId, &pTask->id, pTrans->id);
if (code) {
mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
taosMemoryFree(pBuf);
return code;
}

View File

@ -170,29 +170,29 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
// this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs();
bool updated = false;
int32_t code = 0;
SStreamTask* pTask = NULL;
SStreamTask* pHTask = NULL;
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs();
bool updated = false;
int32_t code = 0;
SStreamTask* pTask = NULL;
SStreamTask* pHTask = NULL;
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
code = tDecodeStreamTaskUpdateMsg(&decoder, &req);
tDecoderClear(&decoder);
if (code < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
tDestroyNodeUpdateMsg(&req);
return rsp.code;
}
tDecoderClear(&decoder);
int32_t gError = streamGetFatalError(pMeta);
if (gError != 0) {
tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),

View File

@ -219,6 +219,9 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
return;
}
qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
if (pMsgBody->paramFreeFp) {

View File

@ -660,7 +660,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
qError("calloc SMsgSendInfo size %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(terrno);
}
@ -672,8 +672,12 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
if (pJob) {
msgSendInfo->requestId = pJob->conn.requestId;
msgSendInfo->requestObjRefId = pJob->conn.requestObjRefId;
} else {
SCH_ERR_JRET(taosGetSystemUUIDU64(&msgSendInfo->requestId));
}
qDebug("ahandle %p alloced, QID:0x%" PRIx64, msgSendInfo, msgSendInfo->requestId);
if (TDMT_SCH_LINK_BROKEN != msgType) {
msgSendInfo->msgInfo.pData = msg;
msgSendInfo->msgInfo.len = msgSize;

View File

@ -382,6 +382,31 @@ void streamMetaRemoveDB(void* arg, char* key) {
streamMutexUnlock(&pMeta->backendMutex);
}
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pInfo->pTasks == NULL) {
return terrno;
}
pInfo->pTaskList = taosArrayInit(4, sizeof(int32_t));
if (pInfo->pTaskList == NULL) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
taosHashCleanup(pInfo->pTasks);
taosArrayDestroy(pInfo->pTaskList);
pInfo->pTasks = NULL;
pInfo->pTaskList = NULL;
}
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
QRY_PARAM_CHECK(p);
@ -435,8 +460,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno);
code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
TSDB_CHECK_CODE(code, lino, _err);
code = streamMetaInitStartInfo(&pMeta->startInfo);
TSDB_CHECK_CODE(code, lino, _err);
@ -641,8 +666,8 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->updateInfo.pTasks);
streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
streamMetaClearStartInfo(&pMeta->startInfo);
destroyMetaHbInfo(pMeta->pHbInfo);
@ -1492,16 +1517,19 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
STaskUpdateInfo* pInfo = &pMeta->updateInfo;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
taosHashClear(pInfo->pTasks);
taosArrayClear(pInfo->pTaskList);
int32_t prev = pInfo->completeTransId;
pInfo->completeTransId = pInfo->activeTransId;
pInfo->activeTransId = -1;
pInfo->completeTs = taosGetTimestampMs();
stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId",
pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId);
stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64
", complete transId:%d->%d, update Tasks:%d reset active transId",
pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
}
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {