refactor: do some internal refactor.
This commit is contained in:
parent
dbb1e9bd59
commit
bbb571a383
|
@ -217,8 +217,8 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
|
@ -347,12 +347,14 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
|
||||||
|
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
STqOffset offset = {0};
|
STqOffset offset = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
|
tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
|
||||||
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
@ -365,13 +367,16 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
offset.val.version += 1;
|
offset.val.version += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
tqError("invalid commit offset type:%d", offset.val.type);
|
||||||
}
|
return -1;
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
|
||||||
if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||||
|
if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
|
||||||
|
return 0; // no need to update the offset value
|
||||||
|
}
|
||||||
|
|
||||||
|
// save the new offset value
|
||||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -379,27 +384,25 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||||
if (pHandle) {
|
if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
|
||||||
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
return -1;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rsp
|
|
||||||
|
|
||||||
/*}*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
|
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
|
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
|
||||||
|
|
||||||
if (pCheck->ntbUid == tbUid) {
|
if (pCheck->ntbUid == tbUid) {
|
||||||
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
@ -411,6 +414,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -455,6 +459,7 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
||||||
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
|
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -594,8 +599,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||||
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||||
|
|
||||||
|
@ -618,8 +622,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64 "",
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||||
metaRsp.rspOffset.version);
|
metaRsp.rspOffset.version);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
|
@ -637,8 +641,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
fetchOffsetNew = taosxRsp.rspOffset;
|
fetchOffsetNew = taosxRsp.rspOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("taosx poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64 "",
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
|
||||||
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
|
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
|
||||||
}
|
}
|
||||||
|
@ -715,6 +719,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -996,7 +1001,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
||||||
tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||||
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
|
@ -1040,7 +1045,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||||
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
||||||
|
@ -1065,6 +1070,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
code = tDecodeSStreamTask(&decoder, pTask);
|
code = tDecodeSStreamTask(&decoder, pTask);
|
||||||
|
@ -1365,7 +1371,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
||||||
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||||
|
|
Loading…
Reference in New Issue