This commit is contained in:
yihaoDeng 2023-09-04 10:33:01 +08:00
parent d09f166b84
commit 03cddfa7ce
1 changed files with 4 additions and 0 deletions

View File

@ -248,10 +248,14 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
tDecoderClear(&decoder); tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
int64_t key[2] = {task.streamId, task.taskId}; int64_t key[2] = {task.streamId, task.taskId};
taosWLockLatch(&pTq->pStreamMeta->lock);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing // do nothing
} }