commit
6177c81091
|
@ -176,6 +176,7 @@ typedef struct {
|
||||||
char opername[TSDB_TRANS_OPER_LEN];
|
char opername[TSDB_TRANS_OPER_LEN];
|
||||||
SArray* pRpcArray;
|
SArray* pRpcArray;
|
||||||
SRWLatch lockRpcArray;
|
SRWLatch lockRpcArray;
|
||||||
|
int64_t mTraceId;
|
||||||
} STrans;
|
} STrans;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -52,6 +52,8 @@ typedef struct {
|
||||||
int32_t contLen;
|
int32_t contLen;
|
||||||
void *pCont;
|
void *pCont;
|
||||||
SSdbRaw *pRaw;
|
SSdbRaw *pRaw;
|
||||||
|
|
||||||
|
int64_t mTraceId;
|
||||||
} STransAction;
|
} STransAction;
|
||||||
|
|
||||||
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
|
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
|
||||||
|
|
|
@ -1417,7 +1417,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -1439,7 +1439,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq);
|
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1475,7 +1475,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndTrimDb(pMnode, pDb);
|
code = mndTrimDb(pMnode, pDb, pReq);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -440,7 +440,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
if (roleChanged) {
|
if (roleChanged) {
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||||
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs);
|
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
|
||||||
|
pDb->stateTs, curMs);
|
||||||
pDb->stateTs = curMs;
|
pDb->stateTs = curMs;
|
||||||
}
|
}
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
@ -954,7 +955,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
|
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
|
||||||
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
|
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
|
||||||
dcfgReq.config, dcfgReq.value);
|
dcfgReq.config, dcfgReq.value);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
|
||||||
tmsgSendReq(&epSet, &rpcMsg);
|
tmsgSendReq(&epSet, &rpcMsg);
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -470,6 +470,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
action.mTraceId = pTrans->mTraceId;
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_DND_CREATE_VNODE;
|
action.msgType = TDMT_DND_CREATE_VNODE;
|
||||||
|
@ -693,6 +694,8 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SMCreateSmaReq createReq = {0};
|
SMCreateSmaReq createReq = {0};
|
||||||
|
|
||||||
|
int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
|
||||||
|
|
||||||
if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -668,6 +668,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
}
|
}
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
action.mTraceId = pTrans->mTraceId;
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
|
@ -877,7 +878,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq);
|
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -434,6 +434,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
action.mTraceId = pTrans->mTraceId;
|
||||||
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
|
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
|
||||||
action.pCont = buf;
|
action.pCont = buf;
|
||||||
action.contLen = tlen;
|
action.contLen = tlen;
|
||||||
|
|
|
@ -505,8 +505,8 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
||||||
mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
|
mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans,
|
||||||
pTrans->startFunc);
|
mndTransStr(pTrans->stage), pTrans->startFunc);
|
||||||
|
|
||||||
if (pTrans->startFunc > 0) {
|
if (pTrans->startFunc > 0) {
|
||||||
TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
|
TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
|
||||||
|
@ -577,8 +577,7 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
|
||||||
|
|
||||||
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
||||||
mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
|
mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
|
||||||
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
|
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), pNew->createdTime);
|
||||||
pNew->createdTime);
|
|
||||||
|
|
||||||
if (pOld->createdTime != pNew->createdTime) {
|
if (pOld->createdTime != pNew->createdTime) {
|
||||||
mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64
|
mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64
|
||||||
|
@ -650,6 +649,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
||||||
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||||
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||||
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
||||||
|
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0;
|
||||||
taosInitRWLatch(&pTrans->lockRpcArray);
|
taosInitRWLatch(&pTrans->lockRpcArray);
|
||||||
|
|
||||||
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
|
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
|
||||||
|
@ -706,7 +706,8 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
|
STransAction action = {
|
||||||
|
.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw, .mTraceId = pTrans->mTraceId};
|
||||||
return mndTransAppendAction(pTrans->redoActions, &action);
|
return mndTransAppendAction(pTrans->redoActions, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -728,6 +729,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
pAction->stage = TRN_STAGE_REDO_ACTION;
|
pAction->stage = TRN_STAGE_REDO_ACTION;
|
||||||
pAction->actionType = TRANS_ACTION_MSG;
|
pAction->actionType = TRANS_ACTION_MSG;
|
||||||
|
pAction->mTraceId = pTrans->mTraceId;
|
||||||
return mndTransAppendAction(pTrans->redoActions, pAction);
|
return mndTransAppendAction(pTrans->redoActions, pAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -875,7 +877,6 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (mndCheckTransConflict(pMnode, pTrans)) {
|
if (mndCheckTransConflict(pMnode, pTrans)) {
|
||||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
@ -912,6 +913,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||||
pNew->pRpcArray = pTrans->pRpcArray;
|
pNew->pRpcArray = pTrans->pRpcArray;
|
||||||
pNew->rpcRsp = pTrans->rpcRsp;
|
pNew->rpcRsp = pTrans->rpcRsp;
|
||||||
pNew->rpcRspLen = pTrans->rpcRspLen;
|
pNew->rpcRspLen = pTrans->rpcRspLen;
|
||||||
|
pNew->mTraceId = pTrans->mTraceId;
|
||||||
pTrans->pRpcArray = NULL;
|
pTrans->pRpcArray = NULL;
|
||||||
pTrans->rpcRsp = NULL;
|
pTrans->rpcRsp = NULL;
|
||||||
pTrans->rpcRspLen = 0;
|
pTrans->rpcRspLen = 0;
|
||||||
|
@ -1167,6 +1169,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
rpcMsg.info.traceId.rootId = pTrans->mTraceId;
|
||||||
|
|
||||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||||
|
|
||||||
char detail[1024] = {0};
|
char detail[1024] = {0};
|
||||||
|
@ -1457,7 +1461,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->code = 0;
|
pTrans->code = 0;
|
||||||
pTrans->stage = TRN_STAGE_FINISHED; // TRN_STAGE_PRE_FINISH is not necessary
|
pTrans->stage = TRN_STAGE_FINISHED; // TRN_STAGE_PRE_FINISH is not necessary
|
||||||
mInfo("trans:%d, stage from commitAction to finished", pTrans->id);
|
mInfo("trans:%d, stage from commitAction to finished", pTrans->id);
|
||||||
continueExec = true;
|
continueExec = true;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -15,6 +15,10 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
// 0: not init
|
||||||
|
// 1: already inited
|
||||||
|
// 2: wait to be inited or cleaup
|
||||||
|
|
||||||
int32_t tqInit() {
|
int32_t tqInit() {
|
||||||
int8_t old;
|
int8_t old;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -213,12 +217,14 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
if (tqDebugFlag & DEBUG_DEBUG) {
|
||||||
char buf2[80] = {0};
|
char buf1[80] = {0};
|
||||||
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
||||||
|
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -271,13 +277,14 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
};
|
};
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
if (tqDebugFlag & DEBUG_DEBUG) {
|
||||||
char buf2[80] = {0};
|
char buf1[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s",
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode),
|
||||||
|
pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,12 +339,14 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
|
||||||
};
|
};
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
if (tqDebugFlag & DEBUG_DEBUG) {
|
||||||
char buf2[80] = {0};
|
char buf1[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||||
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -497,7 +506,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// update epoch if need
|
// update epoch if need
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
while (savedEpoch < reqEpoch) {
|
while (savedEpoch < reqEpoch) {
|
||||||
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
||||||
|
reqEpoch);
|
||||||
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -602,7 +612,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||||
|
", ts:%" PRId64 "",
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||||
|
|
||||||
|
@ -612,7 +623,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// for taosx
|
// for taosx
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, &req);
|
tqInitTaosxRsp(&taosxRsp, &req);
|
||||||
|
|
||||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||||
|
@ -887,14 +898,15 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId,
|
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||||
oldConsumerId);
|
pHandle->consumerId, oldConsumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO handle qmsg and exec modification
|
// TODO handle qmsg and exec modification
|
||||||
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId);
|
tqInfo("update the consumer info, old consumer id:0x%" PRIx64 ", new Id:0x%" PRIx64, pHandle->consumerId,
|
||||||
|
req.newConsumerId);
|
||||||
atomic_store_32(&pHandle->epoch, -1);
|
atomic_store_32(&pHandle->epoch, -1);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
@ -983,9 +995,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
||||||
|
|
||||||
int32_t version = 1;
|
int32_t version = 1;
|
||||||
SMetaInfo info = {0};
|
SMetaInfo info = {0};
|
||||||
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
version = info.skmVer;
|
version = info.skmVer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
|
||||||
|
int32_t code = -1;
|
||||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -33,7 +34,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
}
|
}
|
||||||
|
|
||||||
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
|
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
|
||||||
taosMulModeMkDir(streamPath, 0755);
|
code = taosMulModeMkDir(streamPath, 0755);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
taosMemoryFree(streamPath);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
taosMemoryFree(streamPath);
|
taosMemoryFree(streamPath);
|
||||||
|
|
||||||
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
||||||
|
|
Loading…
Reference in New Issue