Merge pull request #16103 from taosdata/feature/stream

refactor(stream): remove unneccessary param in converting data
This commit is contained in:
Liu Jicong 2022-08-15 13:26:46 +08:00 committed by GitHub
commit b9c9ea5974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 40 additions and 36 deletions

View File

@ -275,12 +275,8 @@ typedef struct SStreamTask {
int32_t nodeId; int32_t nodeId;
SEpSet epSet; SEpSet epSet;
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer; int64_t recoverSnapVer;
int64_t startVer; int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>

View File

@ -381,6 +381,9 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
} }
#endif #endif
taosMemoryFree(pParam->pOffset);
if (pBuf->pData) taosMemoryFree(pBuf->pData);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/ * pOffset->version);*/
@ -402,6 +405,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
tsem_post(&pParamSet->rspSem); tsem_post(&pParamSet->rspSem);
} }
taosMemoryFree(pParamSet);
#if 0 #if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
@ -611,12 +616,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
} }
} }
#if 0
if (!async) { if (!async) {
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif #endif
}
return 0; return 0;
} }
@ -1216,6 +1221,7 @@ END:
} else { } else {
taosMemoryFree(pParam); taosMemoryFree(pParam);
} }
taosMemoryFree(pMsg->pData);
return code; return code;
} }

View File

@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq // tq
int tqInit(); int tqInit();
@ -169,10 +170,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq); const char* stbFullName, SBatchDeleteReq* pDeleteReq);
// sma // sma
int32_t smaInit(); int32_t smaInit();

View File

@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),

View File

@ -14,6 +14,7 @@
*/ */
#include "tq.h" #include "tq.h"
#include "vnd.h"
#if 0 #if 0
void tqTmrRspFunc(void* param, void* tmrId) { void tqTmrRspFunc(void* param, void* tmrId) {
@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
walApplyVer(pTq->pVnode->pWal, ver); if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);

View File

@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/ int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
int64_t groupId = 0;
char* name = buildCtbNameByGroupId(stbFullName, groupId); char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name); tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0}; SMetaReader mr = {0};
@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) { int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL; SArray* schemaReqSz = NULL;
@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// assign data // assign data
// TODO // TODO
ret = rpcMallocCont(cap); ret = rpcMallocCont(cap);
ret->header.vgId = vgId; ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz); ret->numOfBlocks = htonl(sz);
@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq); pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);

View File

@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;

View File

@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
bool vnodeIsLeader(SVnode *pVnode) { bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) { if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),