refactor: do some internal refactor.
This commit is contained in:
parent
c36c5253c5
commit
4fd3064fd9
|
@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
|
|||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
|
||||
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||
int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
|
|
|
@ -223,19 +223,6 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
|
|||
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
}
|
||||
#endif
|
||||
|
||||
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
||||
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
||||
|
||||
|
@ -895,7 +882,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
atomic_store_32(&pHandle->epoch, -1);
|
||||
|
||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||
tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
|
||||
tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
|
||||
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
|
|
|
@ -62,6 +62,8 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
|
|||
}
|
||||
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||
const int32_t MAX_ROWS_TO_RETURN = 4096;
|
||||
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
@ -82,7 +84,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
}
|
||||
}
|
||||
|
||||
int32_t rowCnt = 0;
|
||||
int32_t totalRows = 0;
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
|
@ -94,9 +97,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock);
|
||||
|
||||
// current scan should be stopped asap, since the rebalance occurs.
|
||||
// current scan should be stopped ASAP, since the re-balance occurs.
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -104,9 +105,12 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||
pRsp->blockNum++;
|
||||
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId,
|
||||
pDataBlock->info.rows, pRsp->blockNum);
|
||||
|
||||
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt >= 4096) {
|
||||
totalRows += pDataBlock->info.rows;
|
||||
if (totalRows >= MAX_ROWS_TO_RETURN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -127,6 +131,9 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d, rows:%d", vgId, pHandle->consumerId,
|
||||
pRsp->blockNum, totalRows);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -206,6 +206,87 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
}
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
void* pKey;
|
||||
int64_t keyLen;
|
||||
} SItem;
|
||||
|
||||
static void recordPushedEntry(SArray* cachedKey, void* pIter);
|
||||
|
||||
static void freeItem(void* param) {
|
||||
SItem* p = (SItem*) param;
|
||||
taosMemoryFree(p->pKey);
|
||||
}
|
||||
|
||||
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
size_t numOfKeys = taosArrayGetSize(pCachedKeys);
|
||||
|
||||
for (int32_t i = 0; i < numOfKeys; i++) {
|
||||
SItem* pItem = taosArrayGet(pCachedKeys, i);
|
||||
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
|
||||
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfKeys > 0) {
|
||||
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
|
||||
}
|
||||
}
|
||||
|
||||
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
|
||||
int32_t dataLen, SArray* pCachedKey) {
|
||||
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
||||
|
||||
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||
if (pRsp->reqOffset.version >= ver) {
|
||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
|
||||
pRsp->reqOffset.version, ver);
|
||||
return;
|
||||
}
|
||||
|
||||
qTaskInfo_t pTaskInfo = pExec->task;
|
||||
|
||||
// prepare scan mem data
|
||||
SPackedData submit = {
|
||||
.msgStr = pData,
|
||||
.msgLen = dataLen,
|
||||
.ver = ver,
|
||||
};
|
||||
|
||||
if(qStreamSetScanMemData(pTaskInfo, submit) != 0){
|
||||
return;
|
||||
}
|
||||
|
||||
// here start to scan submit block to extract the subscribed data
|
||||
int32_t totalRows = 0;
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) {
|
||||
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
|
||||
}
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||
pRsp->blockNum++;
|
||||
totalRows += pDataBlock->info.rows;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum,
|
||||
totalRows);
|
||||
|
||||
if (pRsp->blockNum > 0) {
|
||||
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
||||
tqPushDataRsp(pTq, pPushEntry);
|
||||
recordPushedEntry(pCachedKey, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
||||
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
||||
|
@ -220,24 +301,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
||||
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
||||
|
||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||
|
||||
void* data = taosMemoryMalloc(len);
|
||||
void* data = taosMemoryMalloc(len);
|
||||
if (data == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("failed to copy data for stream since out of memory");
|
||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroy(cachedKeyLens);
|
||||
|
||||
// unlock
|
||||
tqError("failed to copy data for stream since out of memory, vgId:%d, consumer:0x%"PRIx64, vgId);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(data, pReq, len);
|
||||
|
||||
void* pIter = NULL;
|
||||
SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
|
||||
void* pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||
if (pIter == NULL) {
|
||||
|
@ -248,83 +324,28 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
||||
continue;
|
||||
}
|
||||
|
||||
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||
if (pRsp->reqOffset.version >= ver) {
|
||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
|
||||
pRsp->reqOffset.version, ver);
|
||||
tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
||||
continue;
|
||||
}
|
||||
|
||||
STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
||||
// prepare scan mem data
|
||||
SPackedData submit = {
|
||||
.msgStr = data,
|
||||
.msgLen = len,
|
||||
.ver = ver,
|
||||
};
|
||||
if(qStreamSetScanMemData(task, submit) != 0){
|
||||
continue;
|
||||
}
|
||||
|
||||
// here start to scan submit block to extract the subscribed data
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
|
||||
}
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum);
|
||||
if (pRsp->blockNum > 0) {
|
||||
// set offset
|
||||
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
||||
|
||||
// remove from hash
|
||||
size_t kLen;
|
||||
void* key = taosHashGetKey(pIter, &kLen);
|
||||
void* keyCopy = taosMemoryCalloc(1, kLen + 1);
|
||||
memcpy(keyCopy, key, kLen);
|
||||
|
||||
taosArrayPush(cachedKeys, &keyCopy);
|
||||
taosArrayPush(cachedKeyLens, &kLen);
|
||||
|
||||
tqPushDataRsp(pTq, pPushEntry);
|
||||
}
|
||||
doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
|
||||
}
|
||||
|
||||
// delete entry
|
||||
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
|
||||
void* key = taosArrayGetP(cachedKeys, i);
|
||||
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
|
||||
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
|
||||
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroy(cachedKeyLens);
|
||||
doRemovePushedEntry(cachedKey, pTq);
|
||||
taosArrayDestroyEx(cachedKey, freeItem);
|
||||
taosMemoryFree(data);
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
}
|
||||
|
||||
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
|
||||
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (msgType == TDMT_VND_SUBMIT) {
|
||||
void* data = taosMemoryMalloc(len);
|
||||
if (data == NULL) {
|
||||
|
@ -351,6 +372,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void recordPushedEntry(SArray* cachedKey, void* pIter) {
|
||||
size_t kLen = 0;
|
||||
void* key = taosHashGetKey(pIter, &kLen);
|
||||
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
|
||||
taosArrayPush(cachedKey, &item);
|
||||
}
|
||||
|
||||
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
||||
SMqDataRsp* pDataRsp, int32_t type) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
|
@ -388,8 +416,8 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
|
||||
|
||||
if (pEntry != NULL) {
|
||||
|
|
Loading…
Reference in New Issue