fix transfer crash
This commit is contained in:
parent
edaa33d28d
commit
947775155b
|
@ -284,6 +284,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
|
||||||
|
|
||||||
pOldStream->status = pNewStream->status;
|
pOldStream->status = pNewStream->status;
|
||||||
pOldStream->updateTime = pNewStream->updateTime;
|
pOldStream->updateTime = pNewStream->updateTime;
|
||||||
|
pOldStream->checkpointId = pNewStream->checkpointId;
|
||||||
|
pOldStream->checkpointFreq = pNewStream->checkpointFreq;
|
||||||
|
|
||||||
taosWUnLockLatch(&pOldStream->lock);
|
taosWUnLockLatch(&pOldStream->lock);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1518,7 +1520,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
char dstStr[20] = {0};
|
char dstStr[20] = {0};
|
||||||
STR_TO_VARSTR(dstStr, sinkQuota)
|
STR_TO_VARSTR(dstStr, sinkQuota)
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
||||||
|
|
||||||
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
|
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
strcpy(scanHistoryIdle, "100a");
|
strcpy(scanHistoryIdle, "100a");
|
||||||
|
@ -1526,7 +1528,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
memset(dstStr, 0, tListLen(dstStr));
|
memset(dstStr, 0, tListLen(dstStr));
|
||||||
STR_TO_VARSTR(dstStr, scanHistoryIdle)
|
STR_TO_VARSTR(dstStr, scanHistoryIdle)
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
|
|
|
@ -106,7 +106,6 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
|
||||||
memcpy(pHdr->data, rowData, len);
|
memcpy(pHdr->data, rowData, len);
|
||||||
taosMemoryFree(rowData);
|
taosMemoryFree(rowData);
|
||||||
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
|
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
|
||||||
taosMemoryFree(rowData);
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
|
@ -288,7 +288,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
||||||
|
|
||||||
taosThreadMutexLock(&p->lock);
|
taosThreadMutexLock(&p->lock);
|
||||||
|
|
||||||
ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId &&
|
ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId &&
|
||||||
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
|
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
|
||||||
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
|
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
|
||||||
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
||||||
|
|
Loading…
Reference in New Issue