Merge pull request #17440 from taosdata/feature/stream
refactor: tmq commit
This commit is contained in:
commit
936c78f424
|
@ -210,6 +210,8 @@ typedef struct {
|
|||
typedef struct {
|
||||
SMqCommitCbParamSet* params;
|
||||
STqOffset* pOffset;
|
||||
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/
|
||||
/*int32_t vgId;*/
|
||||
} SMqCommitCbParam;
|
||||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
|
@ -407,6 +409,14 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
ASSERT(waitingRspNum >= 0);
|
||||
if (waitingRspNum == 0) {
|
||||
tmqCommitDone(pParamSet);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
||||
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
||||
|
@ -420,18 +430,13 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
#endif
|
||||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
if (pBuf->pData) taosMemoryFree(pBuf->pData);
|
||||
taosMemoryFree(pBuf->pData);
|
||||
|
||||
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
|
||||
* pOffset->version);*/
|
||||
|
||||
// count down waiting rsp
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
ASSERT(waitingRspNum >= 0);
|
||||
tmqCommitRspCountDown(pParamSet);
|
||||
|
||||
if (waitingRspNum == 0) {
|
||||
tmqCommitDone(pParamSet);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -591,14 +596,10 @@ FAIL:
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
|
||||
void* userParam) {
|
||||
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
|
||||
void* userParam) {
|
||||
int32_t code = -1;
|
||||
|
||||
if (msg != NULL) {
|
||||
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
|
||||
}
|
||||
|
||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||
if (pParamSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -646,33 +647,37 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
|
|||
}
|
||||
}
|
||||
|
||||
// no request is sent
|
||||
if (pParamSet->totalRspNum == 0) {
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
ASSERT(waitingRspNum >= 0);
|
||||
if (waitingRspNum == 0) {
|
||||
tmqCommitDone(pParamSet);
|
||||
}
|
||||
// count down since waiting rsp num init as 1
|
||||
tmqCommitRspCountDown(pParamSet);
|
||||
|
||||
if (!async) {
|
||||
tsem_wait(&pParamSet->rspSem);
|
||||
code = pParamSet->rspErr;
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (!async) {
|
||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
|
||||
void* userParam) {
|
||||
if (msg) {
|
||||
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
|
||||
} else {
|
||||
return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
|
||||
}
|
||||
}
|
||||
|
||||
void tmqAssignAskEpTask(void* param, void* tmrId) {
|
||||
|
|
|
@ -149,7 +149,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
|||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||
|
||||
// tqRead
|
||||
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||
|
||||
|
@ -181,8 +181,8 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
|
|||
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
|
||||
|
||||
// tqSink
|
||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
|
||||
// tqOffset
|
||||
char* tqOffsetBuildFName(const char* path, int32_t ver);
|
||||
|
|
|
@ -595,7 +595,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqInitTaosxRsp(&taosxRsp, pReq);
|
||||
|
||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||
tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
|
||||
tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||
|
@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
pTask->smaSink.smaSink = smaHandleRes;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqTableSink1;
|
||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
|
||||
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||
|
|
|
@ -123,7 +123,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
||||
|
|
|
@ -284,7 +284,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
return ret;
|
||||
}
|
||||
|
||||
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
int64_t suid = pTask->tbSink.stbUid;
|
||||
|
@ -528,7 +528,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
taosArrayDestroy(tagArray);
|
||||
}
|
||||
|
||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pRes = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
SBatchDeleteReq deleteReq = {0};
|
||||
|
|
Loading…
Reference in New Issue