Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
This commit is contained in:
commit
a750eb9699
|
@ -16,6 +16,8 @@
|
||||||
#define _STREAM_BACKEDN_SNAPSHOT_H_
|
#define _STREAM_BACKEDN_SNAPSHOT_H_
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
|
||||||
|
#define STREAM_STATE_TRANSFER "stream-state-transfer"
|
||||||
|
|
||||||
typedef struct SStreamSnapReader SStreamSnapReader;
|
typedef struct SStreamSnapReader SStreamSnapReader;
|
||||||
typedef struct SStreamSnapWriter SStreamSnapWriter;
|
typedef struct SStreamSnapWriter SStreamSnapWriter;
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ struct SStreamStateReader {
|
||||||
TBC* pCur;
|
TBC* pCur;
|
||||||
|
|
||||||
SStreamSnapReader* pReaderImpl;
|
SStreamSnapReader* pReaderImpl;
|
||||||
int32_t complete;
|
int32_t complete; // open reader or not
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
|
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
|
||||||
|
@ -60,26 +60,29 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
||||||
}
|
}
|
||||||
pReader->pReaderImpl = pSnapReader;
|
pReader->pReaderImpl = pSnapReader;
|
||||||
|
|
||||||
tqDebug("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode));
|
tqDebug("vgId:%d, vnode %s snapshot reader opened", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, vnode stream-state snapshot reader failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
|
||||||
|
tstrerror(code));
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
|
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tqDebug("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode));
|
tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
streamSnapReaderClose(pReader->pReaderImpl);
|
streamSnapReaderClose(pReader->pReaderImpl);
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
|
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
|
||||||
|
tqDebug("vgId:%d, vnode %s snapshot read data", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (pReader->complete == 0) {
|
if (pReader->complete == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -143,13 +146,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir);
|
tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tdir);
|
||||||
pWriter->pWriterImpl = pSnapWriter;
|
pWriter->pWriterImpl = pSnapWriter;
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return code;
|
return code;
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
|
||||||
|
tstrerror(code));
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -157,16 +161,18 @@ _err:
|
||||||
|
|
||||||
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
|
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tqDebug("vgId:%d, vnode stream-state snapshot writer closed", TD_VID(pWriter->pTq->pVnode));
|
tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
|
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
|
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
|
||||||
|
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
|
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = streamStateLoadTasks(pWriter);
|
code = streamStateLoadTasks(pWriter);
|
||||||
}
|
}
|
||||||
|
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -174,6 +180,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
|
||||||
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); }
|
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); }
|
||||||
|
|
||||||
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode));
|
tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
}
|
}
|
||||||
|
|
|
@ -838,6 +838,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
|
||||||
if (hasPrevWindow(pFillSup)) {
|
if (hasPrevWindow(pFillSup)) {
|
||||||
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
||||||
pFillInfo->pos = FILL_POS_END;
|
pFillInfo->pos = FILL_POS_END;
|
||||||
|
resetFillWindow(&pFillSup->next);
|
||||||
pFillSup->next.key = pFillSup->cur.key;
|
pFillSup->next.key = pFillSup->cur.key;
|
||||||
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
|
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
|
||||||
pFillInfo->preRowKey = INT64_MIN;
|
pFillInfo->preRowKey = INT64_MIN;
|
||||||
|
|
|
@ -1535,6 +1535,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
|
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -1874,6 +1875,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
|
||||||
return winNum;
|
return winNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) {
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SResultRow* pCurResult = NULL;
|
||||||
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
// Just look for the window behind StartIndex
|
||||||
|
while (1) {
|
||||||
|
SResultWindowInfo winInfo = {0};
|
||||||
|
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo);
|
||||||
|
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
|
||||||
|
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
|
||||||
|
taosMemoryFree(winInfo.pOutputBuf);
|
||||||
|
pAPI->stateStore.streamStateFreeCur(pCur);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
|
||||||
|
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
|
||||||
|
pAPI->stateStore.streamStateFreeCur(pCur);
|
||||||
|
taosMemoryFree(winInfo.pOutputBuf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
|
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
|
||||||
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
|
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
|
||||||
&pAggSup->stateStore);
|
&pAggSup->stateStore);
|
||||||
|
@ -2081,6 +2107,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
|
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
|
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2089,6 +2116,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
|
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
|
||||||
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2099,7 +2127,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||||
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
|
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
|
||||||
saveResult(parentWin, pStUpdated);
|
saveResult(parentWin, pStUpdated);
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
} else {
|
} else {
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2474,13 +2504,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
|
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
|
||||||
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
|
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
|
||||||
resSize);
|
resSize);
|
||||||
}
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.releaseStreamStateFn) {
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
downstream->fpSet.releaseStreamStateFn(downstream);
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
||||||
|
@ -2492,6 +2520,33 @@ void resetWinRange(STimeWindow* winRange) {
|
||||||
winRange->ekey = INT64_MAX;
|
winRange->ekey = INT64_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
resetWinRange(&pAggSup->winRange);
|
||||||
|
|
||||||
|
SResultWindowInfo winInfo = {0};
|
||||||
|
int32_t size = 0;
|
||||||
|
void* pBuf = NULL;
|
||||||
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
|
||||||
|
int32_t num = size / sizeof(SSessionKey);
|
||||||
|
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
||||||
|
ASSERT(size == num * sizeof(SSessionKey));
|
||||||
|
for (int32_t i = 0; i < num; i++) {
|
||||||
|
SResultWindowInfo winInfo = {0};
|
||||||
|
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
||||||
|
compactSessionSemiWindow(pOperator, &winInfo);
|
||||||
|
saveSessionOutputBuf(pAggSup, &winInfo);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
@ -2731,6 +2786,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||||
pInfo->pStUpdated = NULL;
|
pInfo->pStUpdated = NULL;
|
||||||
|
|
||||||
|
if(pInfo->isHistoryOp) {
|
||||||
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
|
}
|
||||||
|
|
||||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
@ -2763,6 +2823,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
|
||||||
}
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
@ -2801,6 +2862,7 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
@ -2814,6 +2876,7 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
|
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||||
blockDataDestroy(pInfo->pCheckpointRes);
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
|
|
|
@ -137,5 +137,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
void* val, int32_t vlen, int64_t ttl, void* tmpBuf);
|
void* val, int32_t vlen, int64_t ttl, void* tmpBuf);
|
||||||
|
|
||||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
|
||||||
|
int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
|
||||||
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
|
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
|
||||||
#endif
|
#endif
|
|
@ -738,6 +738,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
||||||
taosMemoryFree(chkpPath);
|
taosMemoryFree(chkpPath);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
taosArrayClear(pMeta->chkpSaved);
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(chkpPath);
|
TdDirPtr pDir = taosOpenDir(chkpPath);
|
||||||
|
|
||||||
|
@ -791,12 +792,6 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
|
||||||
if (wrapper->pHandle[i]) {
|
if (wrapper->pHandle[i]) {
|
||||||
rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
|
rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
|
||||||
taosArrayPush(pHandle, &p);
|
taosArrayPush(pHandle, &p);
|
||||||
// size_t len = 0;
|
|
||||||
// char* name = rocksdb_column_family_handle_get_name(p, &len);
|
|
||||||
// char buf[64] = {0};
|
|
||||||
// memcpy(buf, name, len);
|
|
||||||
// qError("column name: name: %s, len: %d", buf, (int)len);
|
|
||||||
// taosMemoryFree(name);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosThreadRwlockUnlock(&wrapper->rwLock);
|
taosThreadRwlockUnlock(&wrapper->rwLock);
|
||||||
|
@ -878,6 +873,49 @@ int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamBackendTriggerChkp(void* arg, char* dst) {
|
||||||
|
SStreamMeta* pMeta = arg;
|
||||||
|
int64_t backendRid = pMeta->streamBackendRid;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
SArray* refs = taosArrayInit(16, sizeof(int64_t));
|
||||||
|
rocksdb_column_family_handle_t** ppCf = NULL;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
||||||
|
|
||||||
|
if (pHandle == NULL || pHandle->db == NULL) {
|
||||||
|
goto _ERROR;
|
||||||
|
}
|
||||||
|
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
|
||||||
|
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf);
|
||||||
|
|
||||||
|
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
|
||||||
|
if (code == 0) {
|
||||||
|
code = chkpDoDbCheckpoint(pHandle->db, dst);
|
||||||
|
if (code != 0) {
|
||||||
|
qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
|
||||||
|
} else {
|
||||||
|
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
|
||||||
|
taosGetTimestampMs() - st);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
qError("stream backend:%p failed to flush db at:%s", pHandle, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
// release all ref to cfWrapper;
|
||||||
|
for (int i = 0; i < taosArrayGetSize(refs); i++) {
|
||||||
|
int64_t id = *(int64_t*)taosArrayGet(refs, i);
|
||||||
|
taosReleaseRef(streamBackendCfWrapperId, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
_ERROR:
|
||||||
|
taosReleaseRef(streamBackendId, backendRid);
|
||||||
|
taosArrayDestroy(refs);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
SStreamMeta* pMeta = arg;
|
SStreamMeta* pMeta = arg;
|
||||||
int64_t backendRid = pMeta->streamBackendRid;
|
int64_t backendRid = pMeta->streamBackendRid;
|
||||||
|
@ -902,7 +940,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
|
|
||||||
// Get all cf and acquire cfWrappter
|
// Get all cf and acquire cfWrappter
|
||||||
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
|
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
|
||||||
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0);
|
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
|
||||||
|
|
||||||
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
|
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -928,6 +966,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
|
|
||||||
// delete obsolte checkpoint
|
// delete obsolte checkpoint
|
||||||
delObsoleteCheckpoint(arg, pChkpDir);
|
delObsoleteCheckpoint(arg, pChkpDir);
|
||||||
|
pMeta->chkpId = checkpointId;
|
||||||
}
|
}
|
||||||
|
|
||||||
_ERROR:
|
_ERROR:
|
||||||
|
|
|
@ -175,8 +175,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
|
||||||
// return -1;
|
// return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||||
|
streamBackendLoadCheckpointInfo(pMeta);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "streamSnapshot.h"
|
#include "streamSnapshot.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "rocksdb/c.h"
|
#include "rocksdb/c.h"
|
||||||
|
#include "streamBackendRocksdb.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
|
||||||
enum SBackendFileType {
|
enum SBackendFileType {
|
||||||
|
@ -79,7 +80,7 @@ const char* ROCKSDB_CURRENT = "CURRENT";
|
||||||
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
|
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
|
||||||
static int64_t kBlockSize = 64 * 1024;
|
static int64_t kBlockSize = 64 * 1024;
|
||||||
|
|
||||||
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId);
|
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta);
|
||||||
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
|
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
|
||||||
|
|
||||||
// static void streamBuildFname(char* path, char* file, char* fullname)
|
// static void streamBuildFname(char* path, char* file, char* fullname)
|
||||||
|
@ -107,23 +108,48 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
|
||||||
return taosOpenFile(fullname, opt);
|
return taosOpenFile(fullname, opt);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
|
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
|
||||||
// impl later
|
// impl later
|
||||||
int len = strlen(path);
|
int len = strlen(path);
|
||||||
char* tdir = taosMemoryCalloc(1, len + 128);
|
char* tdir = taosMemoryCalloc(1, len + 256);
|
||||||
memcpy(tdir, path, len);
|
memcpy(tdir, path, len);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int8_t chkpFlag = 0;
|
||||||
if (chkpId != 0) {
|
if (chkpId != 0) {
|
||||||
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
|
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
|
||||||
chkpId);
|
chkpId);
|
||||||
|
if (taosIsDir(tdir)) {
|
||||||
|
chkpFlag = 1;
|
||||||
|
qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
} else {
|
} else {
|
||||||
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
|
qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
}
|
||||||
|
|
||||||
|
if (chkpFlag == 0) {
|
||||||
|
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
|
||||||
|
char* chkpdir = taosMemoryCalloc(1, len + 256);
|
||||||
|
sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp");
|
||||||
|
taosMemoryFree(tdir);
|
||||||
|
|
||||||
|
tdir = chkpdir;
|
||||||
|
qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
|
|
||||||
|
code = streamBackendTriggerChkp(pMeta, tdir);
|
||||||
|
if (code != 0) {
|
||||||
|
qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
|
taosMemoryFree(tdir);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(tdir);
|
TdDirPtr pDir = taosOpenDir(tdir);
|
||||||
if (NULL == pDir) {
|
if (NULL == pDir) {
|
||||||
qError("stream-state failed to open %s", tdir);
|
qError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,15 +182,31 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_SST) &&
|
if (strlen(name) >= strlen(ROCKSDB_SST) &&
|
||||||
0 == strncmp(name - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
|
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
|
||||||
char* sst = taosStrdup(name);
|
char* sst = taosStrdup(name);
|
||||||
taosArrayPush(pFile->pSst, &sst);
|
taosArrayPush(pFile->pSst, &sst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
char* buf = taosMemoryCalloc(1, 512);
|
||||||
|
sprintf(buf, "[current: %s,", pFile->pCurrent);
|
||||||
|
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
|
||||||
|
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
|
||||||
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
|
||||||
|
char* name = taosArrayGetP(pFile->pSst, i);
|
||||||
|
sprintf(buf + strlen(buf), "%s,", name);
|
||||||
|
}
|
||||||
|
sprintf(buf + strlen(buf) - 1, "]");
|
||||||
|
|
||||||
|
qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
}
|
||||||
|
|
||||||
taosCloseDir(&pDir);
|
taosCloseDir(&pDir);
|
||||||
|
|
||||||
if (pFile->pCurrent == NULL) {
|
if (pFile->pCurrent == NULL) {
|
||||||
qError("stream-state failed to open %s, reason: no valid file", tdir);
|
qError("%s failed to open %s, reason: no valid file", STREAM_STATE_TRANSFER, tdir);
|
||||||
code = -1;
|
code = -1;
|
||||||
tdir = NULL;
|
tdir = NULL;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -221,6 +263,12 @@ _err:
|
||||||
|
|
||||||
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
SBanckendFile* pFile = handle->pBackendFile;
|
SBanckendFile* pFile = handle->pBackendFile;
|
||||||
|
|
||||||
|
if (handle->checkpointId == 0) {
|
||||||
|
if (taosIsDir(pFile->path)) {
|
||||||
|
taosRemoveDir(pFile->path);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (pFile) {
|
if (pFile) {
|
||||||
taosMemoryFree(pFile->pCheckpointMeta);
|
taosMemoryFree(pFile->pCheckpointMeta);
|
||||||
taosMemoryFree(pFile->pCurrent);
|
taosMemoryFree(pFile->pCurrent);
|
||||||
|
@ -234,7 +282,6 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
taosArrayDestroy(pFile->pSst);
|
taosArrayDestroy(pFile->pSst);
|
||||||
taosMemoryFree(pFile);
|
taosMemoryFree(pFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(handle->pFileList);
|
taosArrayDestroy(handle->pFileList);
|
||||||
taosCloseFile(&handle->fd);
|
taosCloseFile(&handle->fd);
|
||||||
return;
|
return;
|
||||||
|
@ -247,7 +294,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) {
|
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) {
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -279,24 +326,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
|
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
|
||||||
qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name,
|
qDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name,
|
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
||||||
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type,
|
qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
|
||||||
tstrerror(code));
|
item->type, tstrerror(code));
|
||||||
return -1;
|
return -1;
|
||||||
} else if (nread > 0 && nread <= kBlockSize) {
|
} else if (nread > 0 && nread <= kBlockSize) {
|
||||||
// left bytes less than kBlockSize
|
// left bytes less than kBlockSize
|
||||||
qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name,
|
qDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
|
@ -304,7 +351,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
pHandle->currFileIdx += 1;
|
pHandle->currFileIdx += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx);
|
qDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
|
||||||
|
pHandle->currFileIdx);
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
pHandle->currFileIdx += 1;
|
pHandle->currFileIdx += 1;
|
||||||
|
@ -321,8 +369,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
|
|
||||||
qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
|
qDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
|
||||||
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
|
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
|
||||||
|
@ -377,7 +425,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pHandle->fd == NULL) {
|
if (pHandle->fd == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
|
qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -386,7 +434,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
|
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
|
||||||
if (bytes != pHdr->size) {
|
if (bytes != pHdr->size) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
|
qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
pHandle->offset += bytes;
|
pHandle->offset += bytes;
|
||||||
|
@ -404,7 +452,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pHandle->fd == NULL) {
|
if (pHandle->fd == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
|
qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,7 +476,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
|
||||||
n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size);
|
n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qDebug("stream snap get file list, %s", buf);
|
qDebug("%s snap get file list, %s", STREAM_STATE_TRANSFER, buf);
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -262,7 +262,6 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
#define ASYNC_CHECK_HANDLE(exh1, id) \
|
#define ASYNC_CHECK_HANDLE(exh1, id) \
|
||||||
do { \
|
do { \
|
||||||
if (id > 0) { \
|
if (id > 0) { \
|
||||||
tTrace("handle step1"); \
|
|
||||||
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
|
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
|
||||||
if (exh2 == NULL || id != exh2->refId) { \
|
if (exh2 == NULL || id != exh2->refId) { \
|
||||||
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
|
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
|
||||||
|
|
Loading…
Reference in New Issue