fix invalid insert

This commit is contained in:
yihaoDeng 2023-08-31 15:28:31 +08:00
parent 9fd0bff673
commit 1833be028f
3 changed files with 18 additions and 11 deletions

View File

@ -423,6 +423,7 @@ void tFreeStreamTask(SStreamTask* pTask);
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
bool streamQueueIsFull(const STaosQueue* pQueue); bool streamQueueIsFull(const STaosQueue* pQueue);

View File

@ -236,29 +236,23 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
STqHandle handle; STqHandle handle;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
if (pHdr->type == SNAP_DATA_STREAM_TASK) { if (pHdr->type == SNAP_DATA_STREAM_TASK) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTaskId task = {0};
if (pTask == NULL) {
return -1;
}
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeStreamTask(&decoder, pTask);
code = tDecodeStreamTaskId(&decoder, &task);
if (code < 0) { if (code < 0) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
taosMemoryFree(pTask);
goto _err; goto _err;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
int64_t key[2] = {pTask->id.streamId, pTask->id.taskId}; int64_t key[2] = {task.streamId, task.taskId};
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
taosMemoryFree(pTask);
return -1; return -1;
} }
// mem leak or not ?
taosMemoryFree(pTask);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing // do nothing
} }

View File

@ -251,6 +251,18 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) {
int64_t ver;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
static void freeItem(void* p) { static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p; SStreamContinueExecInfo* pInfo = p;
@ -538,4 +550,4 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
} }
qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
} }