fix(stream):extract delete msg from wal.
This commit is contained in:
parent
d585f34ea9
commit
5e16db4e19
|
@ -258,7 +258,7 @@ int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||||
int32_t tqNextBlockInWal(STqReader* pReader);
|
int32_t tqNextBlockInWal(STqReader* pReader);
|
||||||
bool tqNextBlockImpl(STqReader *pReader);
|
bool tqNextBlockImpl(STqReader *pReader);
|
||||||
|
|
||||||
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id);
|
||||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
||||||
|
|
|
@ -182,8 +182,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqStreamTasksScanWal(STQ* pTq);
|
int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
bool tqIsHandleExecuting(STqHandle* pHandle);
|
bool tqIsHandleExecuting(STqHandle* pHandle);
|
||||||
|
|
||||||
|
|
|
@ -213,7 +213,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
||||||
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
|
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
|
||||||
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -942,7 +942,66 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
|
||||||
|
SDecoder* pCoder = &(SDecoder){0};
|
||||||
|
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||||
|
|
||||||
|
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
|
||||||
|
if (pRes->uidList == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderInit(pCoder, (uint8_t*)pData, len);
|
||||||
|
tDecodeDeleteRes(pCoder, pRes);
|
||||||
|
tDecoderClear(pCoder);
|
||||||
|
|
||||||
|
int32_t numOfTables = taosArrayGetSize(pRes->uidList);
|
||||||
|
if (numOfTables == 0 || pRes->affectedRows == 0) {
|
||||||
|
taosArrayDestroy(pRes->uidList);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||||
|
blockDataEnsureCapacity(pDelBlock, numOfTables);
|
||||||
|
pDelBlock->info.rows = numOfTables;
|
||||||
|
pDelBlock->info.version = ver;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
|
// start key column
|
||||||
|
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column
|
||||||
|
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
|
||||||
|
// uid column
|
||||||
|
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
int64_t* pUid = taosArrayGet(pRes->uidList, i);
|
||||||
|
colDataSetVal(pUidCol, i, (const char*)pUid, false);
|
||||||
|
|
||||||
|
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
|
||||||
|
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
|
||||||
|
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pRes->uidList);
|
||||||
|
|
||||||
|
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
|
||||||
|
*pRef = 1;
|
||||||
|
|
||||||
|
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
|
if (pRefBlock == NULL) {
|
||||||
|
taosMemoryFree(pRef);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
|
(*pRefBlock)->pBlock = pDelBlock;
|
||||||
|
(*pRefBlock)->dataRef = pRef;
|
||||||
|
atomic_add_fetch_32((*pRefBlock)->dataRef, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
bool failed = false;
|
bool failed = false;
|
||||||
SDecoder* pCoder = &(SDecoder){0};
|
SDecoder* pCoder = &(SDecoder){0};
|
||||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||||
|
@ -962,6 +1021,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
taosArrayDestroy(pRes->uidList);
|
taosArrayDestroy(pRes->uidList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||||
blockDataEnsureCapacity(pDelBlock, sz);
|
blockDataEnsureCapacity(pDelBlock, sz);
|
||||||
pDelBlock->info.rows = sz;
|
pDelBlock->info.rows = sz;
|
||||||
|
|
|
@ -34,12 +34,12 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_DELETE) {
|
if (msgType == TDMT_VND_DELETE) {
|
||||||
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
// tqProcessDeleteDataReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -301,25 +301,44 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
|
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
|
||||||
if (walNextValidMsg(pReader) < 0) {
|
int32_t code = walNextValidMsg(pReader);
|
||||||
return -1;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
|
||||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
|
||||||
int64_t ver = pReader->pHead->head.version;
|
int64_t ver = pReader->pHead->head.version;
|
||||||
|
|
||||||
void* data = taosMemoryMalloc(len);
|
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
if (data == NULL) {
|
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
void* data = taosMemoryMalloc(len);
|
||||||
return -1;
|
if (data == NULL) {
|
||||||
|
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(data, pBody, len);
|
||||||
|
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
|
||||||
|
|
||||||
|
*pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT);
|
||||||
|
if (*pItem == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tqError("%s failed to create data submit for stream since out of memory", id);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
|
||||||
|
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
|
||||||
|
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(data, pBody, len);
|
|
||||||
*pPackedData = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
} else {
|
} else {
|
||||||
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
if (currentVer != -1) {
|
if (currentVer == -1) {
|
||||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
@ -137,26 +137,24 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
// tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
|
|
||||||
SPackedData packData = {0};
|
SStreamQueueItem* pItem = NULL;
|
||||||
int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
|
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
|
// delete ignore
|
||||||
if (p == NULL) {
|
if (pItem == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
|
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
noNewDataInWal = false;
|
noNewDataInWal = false;
|
||||||
|
|
||||||
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
|
code = tqAddInputBlockNLaunchTask(pTask, pItem);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
|
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
|
||||||
|
@ -165,8 +163,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
|
tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamDataSubmitDestroy(p);
|
|
||||||
taosFreeQitem(p);
|
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,10 +26,10 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||||
return taosStrdup(buf);
|
return taosStrdup(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
|
||||||
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver);
|
tqError("s-task:%s failed to put into queue, too many, next ver:%" PRId64, pTask->id.idStr, /*pPackedData->ver*/ 0L);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || pReader->pHead->head.msgType == TDMT_VND_DELETE ||
|
||||||
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
|
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
|
||||||
if (walFetchBodyNew(pReader) < 0) {
|
if (walFetchBodyNew(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue