Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
b054cee72f
|
@ -527,10 +527,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.version = 1;
|
||||
streamObj.sql = pCreate->sql;
|
||||
streamObj.smaId = smaObj.uid;
|
||||
streamObj.watermark = 0;
|
||||
streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
|
||||
streamObj.watermark = pCreate->watermark;
|
||||
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
||||
streamObj.triggerParam = pCreate->maxDelay;
|
||||
streamObj.ast = strdup(smaObj.ast);
|
||||
|
||||
// check the maxDelay
|
||||
if (streamObj.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||
int64_t msInterval = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND);
|
||||
streamObj.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
|
||||
}
|
||||
if (streamObj.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
|
||||
streamObj.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
|
||||
}
|
||||
|
||||
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
|
||||
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -238,7 +238,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
|
|||
// tsdbCache
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(SLRUCache *pCache);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
|
|
|
@ -147,6 +147,8 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tDecoderClear(&pTbCur->mr.coder);
|
||||
|
||||
metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
|
||||
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
||||
continue;
|
||||
|
|
|
@ -173,20 +173,64 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
STSRow *cacheRow = NULL;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
((void)(row));
|
||||
// ((void)(row));
|
||||
|
||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
||||
getTableCacheKey(uid, 1, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
TSKEY keyTs = row->ts;
|
||||
bool invalidate = false;
|
||||
|
||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||
int16_t nCol = taosArrayGetSize(pLast);
|
||||
int16_t iCol = 0;
|
||||
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (keyTs > tTsVal->ts) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = keyTs});
|
||||
|
||||
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
|
||||
}
|
||||
|
||||
for (++iCol; iCol < nCol; ++iCol) {
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (keyTs >= tTsVal->ts) {
|
||||
SColVal *tColVal = &tTsVal->colVal;
|
||||
|
||||
SColVal colVal = {0};
|
||||
tTSRowGetVal(row, pTSchema, iCol, &colVal);
|
||||
if (colVal.isNone || colVal.isNull) {
|
||||
if (keyTs == tTsVal->ts && !tColVal->isNone && !tColVal->isNull) {
|
||||
invalidate = true;
|
||||
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
taosLRUCacheRelease(pCache, h, invalidate);
|
||||
|
||||
// clear last cache anyway, lazy load when get last lookup
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -516,12 +560,46 @@ typedef struct SMemNextRowIter {
|
|||
SMEMNEXTROWSTATES state;
|
||||
STbData *pMem; // [input]
|
||||
STbDataIter iter; // mem buffer skip list iterator
|
||||
// bool iterOpened;
|
||||
// TSDBROW *curRow;
|
||||
} SMemNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) {
|
||||
// static int32_t getNextRowFromMem(void *iter, SArray *pRowArray) {
|
||||
SMemNextRowIter *state = (SMemNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
/*
|
||||
if (!state->iterOpened) {
|
||||
if (state->pMem != NULL) {
|
||||
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
|
||||
|
||||
state->iterOpened = true;
|
||||
|
||||
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
|
||||
if (pMemRow) {
|
||||
state->curRow = pMemRow;
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pRowArray, state->curRow);
|
||||
while (tsdbTbDataIterNext(&state->iter)) {
|
||||
TSDBROW *row = tsdbTbDataIterGet(&state->iter);
|
||||
|
||||
if (TSDBROW_TS(row) < TSDBROW_TS(state->curRow)) {
|
||||
state->curRow = row;
|
||||
break;
|
||||
} else {
|
||||
taosArrayPush(pRowArray, row);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
*/
|
||||
switch (state->state) {
|
||||
case SMEMNEXTROW_ENTER: {
|
||||
if (state->pMem != NULL) {
|
||||
|
@ -599,7 +677,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
|
||||
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
|
||||
bool deleted = false;
|
||||
while (*iSkyline > 0) {
|
||||
TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
|
||||
|
@ -626,9 +704,11 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
|
|||
}
|
||||
|
||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
|
||||
// typedef int32_t (*_next_row_fn_t)(void *iter, SArray *pRowArray);
|
||||
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
|
||||
|
||||
typedef struct TsdbNextRowState {
|
||||
// typedef struct TsdbNextRowState {
|
||||
typedef struct {
|
||||
TSDBROW *pRow;
|
||||
bool stop;
|
||||
bool next;
|
||||
|
@ -637,6 +717,388 @@ typedef struct TsdbNextRowState {
|
|||
_next_row_clear_fn_t nextRowClearFn;
|
||||
} TsdbNextRowState;
|
||||
|
||||
typedef struct {
|
||||
// STsdb *pTsdb;
|
||||
SArray *pSkyline;
|
||||
int64_t iSkyline;
|
||||
|
||||
SBlockIdx idx;
|
||||
SMemNextRowIter memState;
|
||||
SMemNextRowIter imemState;
|
||||
SFSNextRowIter fsState;
|
||||
TSDBROW memRow, imemRow, fsRow;
|
||||
|
||||
TsdbNextRowState input[3];
|
||||
} CacheNextRowIter;
|
||||
|
||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
|
||||
int code = 0;
|
||||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
if (pTsdb->mem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
||||
}
|
||||
|
||||
STbData *pIMem = NULL;
|
||||
if (pTsdb->imem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
||||
}
|
||||
|
||||
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
|
||||
SDelIdx delIdx;
|
||||
|
||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
||||
if (pDelFile) {
|
||||
SDelFReader *pDelFReader;
|
||||
|
||||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pIter->pSkyline);
|
||||
if (code) goto _err;
|
||||
|
||||
tsdbDelFReaderClose(&pDelFReader);
|
||||
} else {
|
||||
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
||||
|
||||
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
|
||||
|
||||
pIter->fsState.state = SFSNEXTROW_FS;
|
||||
pIter->fsState.pTsdb = pTsdb;
|
||||
pIter->fsState.pBlockIdxExp = &pIter->idx;
|
||||
|
||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||
pIter->input[2] =
|
||||
(TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
||||
|
||||
if (pMem) {
|
||||
pIter->memState.pMem = pMem;
|
||||
pIter->memState.state = SMEMNEXTROW_ENTER;
|
||||
pIter->input[0].stop = false;
|
||||
pIter->input[0].next = true;
|
||||
}
|
||||
|
||||
if (pIMem) {
|
||||
pIter->imemState.pMem = pIMem;
|
||||
pIter->imemState.state = SMEMNEXTROW_ENTER;
|
||||
pIter->input[1].stop = false;
|
||||
pIter->input[1].next = true;
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
||||
int code = 0;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (pIter->input[i].nextRowClearFn) {
|
||||
pIter->input[i].nextRowClearFn(pIter->input[i].iter);
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->pSkyline) {
|
||||
taosArrayDestroy(pIter->pSkyline);
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
// iterate next row non deleted backward ts, version (from high to low)
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
||||
int code = 0;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pIter->input[i].pRow == NULL) {
|
||||
pIter->input[i].stop = true;
|
||||
pIter->input[i].next = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
// select maxpoint(s) from mem, imem, fs
|
||||
TSDBROW *max[3] = {0};
|
||||
int iMax[3] = {-1, -1, -1};
|
||||
int nMax = 0;
|
||||
TSKEY maxKey = TSKEY_MIN;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
||||
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
||||
|
||||
// merging & deduplicating on client side
|
||||
if (maxKey <= key.ts) {
|
||||
if (maxKey < key.ts) {
|
||||
nMax = 0;
|
||||
maxKey = key.ts;
|
||||
}
|
||||
|
||||
iMax[nMax] = i;
|
||||
max[nMax++] = pIter->input[i].pRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete detection
|
||||
TSDBROW *merge[3] = {0};
|
||||
int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||
|
||||
bool deleted = tsdbKeyDeleted(&maxKey, pIter->pSkyline, &pIter->iSkyline);
|
||||
if (!deleted) {
|
||||
iMerge[nMerge] = iMax[i];
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
pIter->input[iMax[i]].next = deleted;
|
||||
}
|
||||
|
||||
if (nMerge > 0) {
|
||||
pIter->input[iMerge[0]].next = true;
|
||||
|
||||
*ppRow = merge[0];
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow2(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = TSDBROW_TS(pRow);
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs});
|
||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
|
||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pColVal->isNone && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if ((TSDBROW_TS(pRow) < lastRowTs)) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
}
|
||||
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (iCol = noneCol; iCol < nCol; ++iCol) {
|
||||
// high version's column value
|
||||
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
if (tColVal->isNone && !pColVal->isNone) {
|
||||
taosArraySet(pColArray, iCol, pColVal);
|
||||
} else if (tColVal->isNone && pColVal->isNone && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
// build the result ts row here
|
||||
*dup = false;
|
||||
if (taosArrayGetSize(pColArray) == nCol) {
|
||||
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
}
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs});
|
||||
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
|
||||
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if ((pColVal->isNone || pColVal->isNull) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
/*
|
||||
if ((TSDBROW_TS(pRow) < lastRowTs)) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
}
|
||||
*/
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (iCol = noneCol; iCol < nCol; ++iCol) {
|
||||
// high version's column value
|
||||
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
if ((tColVal->isNone || tColVal->isNull) && (!pColVal->isNone && !pColVal->isNull)) {
|
||||
taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||
//} else if (tColVal->isNone && pColVal->isNone && !setNoneCol) {
|
||||
} else if ((tColVal->isNone || tColVal->isNull) && (pColVal->isNone || pColVal->isNull) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
// build the result ts row here
|
||||
//*dup = false;
|
||||
if (taosArrayGetSize(pColArray) <= 0) {
|
||||
*ppLastArray = NULL;
|
||||
taosArrayDestroy(pColArray);
|
||||
} else {
|
||||
*ppLastArray = pColArray;
|
||||
}
|
||||
/* if (taosArrayGetSize(pColArray) == nCol) {
|
||||
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}*/
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
// taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
// taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
SArray *pSkyline = NULL;
|
||||
|
@ -682,7 +1144,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
|
||||
SBlockIdx idx = {.suid = suid, .uid = uid};
|
||||
|
||||
|
@ -719,6 +1181,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
do {
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].next && !input[i].stop) {
|
||||
if (input[i].pRow == NULL) {
|
||||
code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -728,6 +1191,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (input[0].stop && input[1].stop && input[2].stop) {
|
||||
break;
|
||||
|
@ -758,14 +1222,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
|
||||
// delete detection
|
||||
TSDBROW *merge[3] = {0};
|
||||
// int iMerge[3] = {-1, -1, -1};
|
||||
int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||
|
||||
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||
if (!deleted) {
|
||||
// iMerge[nMerge] = i;
|
||||
iMerge[nMerge] = i;
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
|
@ -792,7 +1256,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
}
|
||||
}
|
||||
|
||||
} while (*ppRow == NULL);
|
||||
} while (1);
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].nextRowClearFn) {
|
||||
|
@ -819,11 +1283,6 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||
int32_t code = 0;
|
||||
|
@ -873,7 +1332,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
|
||||
SBlockIdx idx = {.suid = suid, .uid = uid};
|
||||
|
||||
|
@ -1128,7 +1587,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
|||
} else {
|
||||
STSRow *pRow = NULL;
|
||||
bool dup = false; // which is always false for now
|
||||
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
|
||||
code = mergeLastRow2(uid, pTsdb, &dup, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
if (!dup && pRow) {
|
||||
|
@ -1195,7 +1654,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
|||
// STSRow *pRow = NULL;
|
||||
// code = mergeLast(uid, pTsdb, &pRow);
|
||||
SArray *pLastArray = NULL;
|
||||
code = mergeLast(uid, pTsdb, &pLastArray);
|
||||
// code = mergeLast(uid, pTsdb, &pLastArray);
|
||||
code = mergeLast2(uid, pTsdb, &pLastArray);
|
||||
// if table's empty or error, return code of -1
|
||||
// if (code < 0 || pRow == NULL) {
|
||||
if (code < 0 || pLastArray == NULL) {
|
||||
|
|
|
@ -561,7 +561,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
}
|
||||
}
|
||||
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
|
||||
|
||||
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
||||
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
|
||||
|
|
|
@ -15,8 +15,7 @@
|
|||
|
||||
#include "vnd.h"
|
||||
|
||||
const SVnodeCfg vnodeCfgDefault = {
|
||||
.vgId = -1,
|
||||
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
||||
.dbname = "",
|
||||
.dbId = 0,
|
||||
.szPage = 4096,
|
||||
|
@ -35,7 +34,15 @@ const SVnodeCfg vnodeCfgDefault = {
|
|||
.keep0 = 5256000,
|
||||
.keep1 = 5256000},
|
||||
.walCfg =
|
||||
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
||||
{
|
||||
.vgId = -1,
|
||||
.fsyncPeriod = 0,
|
||||
.retentionPeriod = -1,
|
||||
.rollPeriod = -1,
|
||||
.segSize = -1,
|
||||
.retentionSize = -1,
|
||||
.level = TAOS_WAL_WRITE,
|
||||
},
|
||||
.hashBegin = 0,
|
||||
.hashEnd = 0,
|
||||
.hashMethod = 0};
|
||||
|
@ -79,7 +86,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
SJson *pNodeRetentions = tjsonCreateArray();
|
||||
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
|
||||
for (int32_t i = 0; i < nRetention; ++i) {
|
||||
SJson * pNodeRetention = tjsonCreateObject();
|
||||
SJson *pNodeRetention = tjsonCreateObject();
|
||||
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
|
||||
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
|
||||
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
|
||||
|
@ -156,7 +163,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
|
||||
if (code < 0) return -1;
|
||||
SJson * pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
|
||||
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
|
||||
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
|
||||
if (nRetention > TSDB_RETENTION_MAX) {
|
||||
nRetention = TSDB_RETENTION_MAX;
|
||||
|
|
|
@ -230,6 +230,7 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||
|
||||
// preCommit
|
||||
smaPreCommit(pVnode->pSma);
|
||||
|
@ -278,6 +279,7 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
smaPostCommit(pVnode->pSma);
|
||||
|
||||
// apply the commit (TODO)
|
||||
walEndSnapshot(pVnode->pWal);
|
||||
vnodeBufPoolReset(pVnode->onCommit);
|
||||
pVnode->onCommit->next = pVnode->pPool;
|
||||
pVnode->pPool = pVnode->onCommit;
|
||||
|
|
|
@ -117,6 +117,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
// open wal
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
|
||||
taosRealPath(tdir, NULL, sizeof(tdir));
|
||||
|
||||
// for test tsdb snapshot
|
||||
#if 0
|
||||
pVnode->config.walCfg.segSize = 200;
|
||||
pVnode->config.walCfg.retentionSize = 2000;
|
||||
#endif
|
||||
|
||||
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
|
||||
if (pVnode->pWal == NULL) {
|
||||
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
|
|
|
@ -1134,12 +1134,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||
pInfo->pRes->info.capacity = pBlock->info.rows;
|
||||
|
||||
// for generating rollup SMA result, each time is an independent time serie.
|
||||
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
|
||||
if (pInfo->assignBlockUid) {
|
||||
pInfo->pRes->info.groupId = pBlock->info.uid;
|
||||
}
|
||||
|
||||
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
if (groupIdPre) {
|
||||
pInfo->pRes->info.groupId = *groupIdPre;
|
||||
|
@ -1147,6 +1141,12 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
pInfo->pRes->info.groupId = 0;
|
||||
}
|
||||
|
||||
// for generating rollup SMA result, each time is an independent time serie.
|
||||
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
|
||||
if (pInfo->assignBlockUid) {
|
||||
pInfo->pRes->info.groupId = pBlock->info.uid;
|
||||
}
|
||||
|
||||
// todo extract method
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
|
||||
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
|
||||
|
|
|
@ -461,12 +461,10 @@ static bool isDistinctOrderBy(STranslateContext* pCxt) {
|
|||
((SSelectStmt*)pCxt->pCurrStmt)->isDistinct);
|
||||
}
|
||||
|
||||
static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) {
|
||||
static bool belongTable(const SColumnNode* pCol, const STableNode* pTable) {
|
||||
int cmp = 0;
|
||||
if ('\0' != pCol->dbName[0]) {
|
||||
cmp = strcmp(pCol->dbName, pTable->dbName);
|
||||
} else {
|
||||
cmp = (QUERY_NODE_REAL_TABLE == nodeType(pTable) ? strcmp(currentDb, pTable->dbName) : 0);
|
||||
}
|
||||
if (0 == cmp) {
|
||||
cmp = strcmp(pCol->tableAlias, pTable->tableAlias);
|
||||
|
@ -630,7 +628,7 @@ static EDealRes translateColumnWithPrefix(STranslateContext* pCxt, SColumnNode**
|
|||
bool foundTable = false;
|
||||
for (size_t i = 0; i < nums; ++i) {
|
||||
STableNode* pTable = taosArrayGetP(pTables, i);
|
||||
if (belongTable(pCxt->pParseCxt->db, (*pCol), pTable)) {
|
||||
if (belongTable((*pCol), pTable)) {
|
||||
foundTable = true;
|
||||
bool foundCol = false;
|
||||
pCxt->errCode = findAndSetColumn(pCxt, pCol, pTable, &foundCol);
|
||||
|
@ -4017,8 +4015,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
|
|||
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit);
|
||||
if (NULL != pStmt->pOptions->pStreamOptions) {
|
||||
SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions;
|
||||
pReq->maxDelay = (NULL != pStreamOpt->pDelay ? ((SValueNode*)pStreamOpt->pDelay)->datum.i : 0);
|
||||
pReq->watermark = (NULL != pStreamOpt->pWatermark ? ((SValueNode*)pStreamOpt->pWatermark)->datum.i : 0);
|
||||
pReq->maxDelay = (NULL != pStreamOpt->pDelay ? ((SValueNode*)pStreamOpt->pDelay)->datum.i : -1);
|
||||
pReq->watermark = (NULL != pStreamOpt->pWatermark ? ((SValueNode*)pStreamOpt->pWatermark)->datum.i
|
||||
: TSDB_DEFAULT_ROLLUP_WATERMARK);
|
||||
if (pReq->watermark < TSDB_MIN_ROLLUP_WATERMARK) {
|
||||
pReq->watermark = TSDB_MIN_ROLLUP_WATERMARK;
|
||||
}
|
||||
if (pReq->watermark > TSDB_MAX_ROLLUP_WATERMARK) {
|
||||
pReq->watermark = TSDB_MAX_ROLLUP_WATERMARK;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->tableName, &pReq->dstVgId);
|
||||
|
|
|
@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||
|
||||
qInfo("task %d receive dispatch rsp", pTask->taskId);
|
||||
qDebug("task %d receive dispatch rsp", pTask->taskId);
|
||||
|
||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
@ -242,7 +242,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
|
|||
}
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
||||
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
||||
|
||||
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
|
||||
|
||||
|
|
|
@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
}
|
||||
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
||||
|
||||
qInfo("stream continue dispatching: task %d", pTask->taskId);
|
||||
qDebug("stream continue dispatching: task %d", pTask->taskId);
|
||||
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
|
|
|
@ -83,7 +83,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
newCommitIndex = index;
|
||||
|
||||
if (gRaftDetailLog) {
|
||||
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64 " commit, pSyncNode->commitIndex:%" PRId64,
|
||||
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64
|
||||
" commit, pSyncNode->commitIndex:%" PRId64,
|
||||
newCommitIndex, pSyncNode->commitIndex);
|
||||
}
|
||||
|
||||
|
|
|
@ -824,8 +824,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d optimized index:%" PRId64 " error, msgtype:%s,%d", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->msgType);
|
||||
sError("vgId:%d optimized index:%" PRId64 " error, msgtype:%s,%d", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -1527,7 +1527,9 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
|||
char logBuf[256 + 256];
|
||||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, "
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64
|
||||
", lastsnapshot:%" PRId64
|
||||
", standby:%d, "
|
||||
"strategy:%d, batch:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
|
||||
|
@ -1546,7 +1548,9 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
|||
char* s = (char*)taosMemoryMalloc(len);
|
||||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||
snprintf(s, len,
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, "
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64
|
||||
", lastsnapshot:%" PRId64
|
||||
", standby:%d, "
|
||||
"strategy:%d, batch:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
|
||||
|
@ -1590,7 +1594,9 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
|||
char logBuf[256 + 256];
|
||||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, "
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64
|
||||
", lastsnapshot:%" PRId64
|
||||
", standby:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||
|
@ -1607,7 +1613,9 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
|||
char* s = (char*)taosMemoryMalloc(len);
|
||||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||
snprintf(s, len,
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, "
|
||||
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64
|
||||
", lastsnapshot:%" PRId64
|
||||
", standby:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||
|
@ -1636,7 +1644,9 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
|||
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
||||
|
||||
snprintf(s, len,
|
||||
"vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, "
|
||||
"vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64
|
||||
", lastsnapshot:%" PRId64
|
||||
", standby:%d, "
|
||||
"replica-num:%d, "
|
||||
"lconfig:%" PRId64 ", changing:%d, restore:%d",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
|
@ -1839,8 +1849,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&oldConfig);
|
||||
char* newStr = syncCfg2SimpleStr(pNewConfig);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s --> %s", oldConfig.replicaNum,
|
||||
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s --> %s",
|
||||
oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
|
@ -1863,8 +1873,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2SimpleStr(&oldConfig);
|
||||
char* newStr = syncCfg2SimpleStr(pNewConfig);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s --> %s", oldConfig.replicaNum,
|
||||
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s --> %s",
|
||||
oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
syncNodeEventLog(pSyncNode, tmpbuf);
|
||||
|
@ -2399,7 +2409,8 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
|||
// log state
|
||||
char logBuf[1024] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64 ", "
|
||||
"==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64
|
||||
", "
|
||||
"electTimerLogicClockUser:%" PRIu64 ", electTimerMS:%d",
|
||||
ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
|
||||
ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);
|
||||
|
|
|
@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
|||
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
|
||||
if (pSyncCfg != NULL) {
|
||||
int32_t len = 512;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
memset(s, 0, len);
|
||||
|
||||
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
|
||||
|
@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
|
|
|
@ -122,8 +122,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
|
|||
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, err,
|
||||
err, errStr, sysErr, sysErrStr);
|
||||
"wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||
snapshotIndex, err, err, errStr, sysErr, sysErrStr);
|
||||
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||
|
||||
return -1;
|
||||
|
@ -207,8 +207,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
|||
|
||||
SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
|
||||
if (pEntry->index != writeIndex) {
|
||||
sError("vgId:%d wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId, pEntry->index,
|
||||
writeIndex);
|
||||
sError("vgId:%d wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId,
|
||||
pEntry->index, writeIndex);
|
||||
pEntry->index = writeIndex;
|
||||
}
|
||||
|
||||
|
@ -272,8 +272,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
|
|||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err,
|
||||
err, errStr, sysErr, sysErrStr);
|
||||
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||
index, err, err, errStr, sysErr, sysErrStr);
|
||||
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||
syncNodeEventLog(pData->pSyncNode, logBuf);
|
||||
} else {
|
||||
|
@ -418,8 +418,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
|
||||
err, err, errStr, sysErr, sysErrStr);
|
||||
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||
index, err, err, errStr, sysErr, sysErrStr);
|
||||
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||
syncNodeEventLog(pData->pSyncNode, logBuf);
|
||||
} else {
|
||||
|
|
|
@ -135,7 +135,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
|||
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
|
||||
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
|
||||
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
|
||||
sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 ", match-index:%d, raftid:%" PRId64,
|
||||
sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
|
||||
", match-index:%d, raftid:%" PRId64,
|
||||
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
|
||||
|
||||
return -1;
|
||||
|
@ -224,7 +225,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
|
|||
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
|
||||
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
|
||||
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
|
||||
sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 ", match-index:%d, raftid:%" PRId64,
|
||||
sError("vgId:%d sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
|
||||
", match-index:%d, raftid:%" PRId64,
|
||||
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
|
||||
|
||||
return -1;
|
||||
|
@ -314,8 +316,9 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
|
|||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", "
|
||||
sDebug("vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
|
||||
", pterm:%" PRIu64 ", commit:%" PRId64
|
||||
", "
|
||||
"datalen:%d}",
|
||||
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
|
||||
pMsg->commitIndex, pMsg->dataLen);
|
||||
|
|
|
@ -55,7 +55,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", maybe replica already dropped",
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
||||
", maybe replica already dropped",
|
||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
@ -97,8 +98,9 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", reply-grant:%d", host, port,
|
||||
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
||||
", reply-grant:%d",
|
||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
||||
|
@ -181,7 +183,8 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", maybe replica already dropped",
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
||||
", maybe replica already dropped",
|
||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
@ -221,8 +224,9 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 ", reply-grant:%d", host, port,
|
||||
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
||||
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
|
||||
", reply-grant:%d",
|
||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
||||
|
|
|
@ -66,8 +66,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
|||
|
||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||
char logBuf[128] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64,
|
||||
pMsg->term, ths->pRaftStore->currentTerm);
|
||||
syncNodePrint2(logBuf, ths);
|
||||
sError("%s", logBuf);
|
||||
return ret;
|
||||
|
@ -190,8 +190,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
|||
|
||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||
char logBuf[128] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64,
|
||||
pMsg->term, ths->pRaftStore->currentTerm);
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
syncNodePrint2(logBuf, ths);
|
||||
sError("%s", logBuf);
|
||||
return ret;
|
||||
|
|
|
@ -153,8 +153,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
|
|||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64, oldLastConfigIndex,
|
||||
newLastConfigIndex);
|
||||
snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64,
|
||||
oldLastConfigIndex, newLastConfigIndex);
|
||||
char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
|
||||
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
||||
taosMemoryFree(eventLog);
|
||||
|
@ -389,7 +389,9 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
|||
syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
|
||||
|
||||
snprintf(s, len,
|
||||
"%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 " seq:%d ack:%d finish:%d pterm:%" PRIu64 " "
|
||||
"%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
|
||||
" seq:%d ack:%d finish:%d pterm:%" PRIu64
|
||||
" "
|
||||
"replica-index:%d %s:%d}",
|
||||
event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
|
||||
|
@ -692,7 +694,9 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
|
|||
syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);
|
||||
|
||||
snprintf(s, len,
|
||||
"%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " "
|
||||
"%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64
|
||||
" laindex:%" PRId64 " laterm:%" PRIu64
|
||||
" "
|
||||
"lcindex:%" PRId64 "}",
|
||||
event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
|
||||
pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
|
||||
|
|
|
@ -45,7 +45,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
if (cbMeta.index > beginIndex) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " \n",
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
|
||||
", term:%" PRIu64 " \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag, cbMeta.term);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
|
@ -56,17 +57,19 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
|
||||
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
snprintf(
|
||||
logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
}
|
||||
|
||||
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
}
|
||||
|
||||
|
@ -147,8 +150,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
|
|||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||
|
||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta.flag,
|
||||
cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64,
|
||||
cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
}
|
||||
|
||||
SSyncFSM* createFsm() {
|
||||
|
@ -267,7 +270,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
|
|||
pMsg->msgType = 9999;
|
||||
pMsg->contLen = 256;
|
||||
pMsg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs());
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count,
|
||||
taosGetTimestampMs());
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,8 +44,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
if (cbMeta.index > beginIndex) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
} else {
|
||||
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
|
||||
|
@ -54,17 +55,19 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
|
||||
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
snprintf(
|
||||
logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
}
|
||||
|
||||
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
}
|
||||
|
||||
|
@ -78,8 +81,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
|||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||
|
||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta.flag,
|
||||
cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64,
|
||||
cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||
}
|
||||
|
||||
SSyncFSM* createFsm() {
|
||||
|
@ -188,7 +191,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
|
|||
pMsg->msgType = 9999;
|
||||
pMsg->contLen = 256;
|
||||
pMsg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs());
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count,
|
||||
taosGetTimestampMs());
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "tskiplist.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
|
@ -148,15 +149,131 @@ void test4() {
|
|||
raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache);
|
||||
}
|
||||
|
||||
static char* keyFn(const void* pData) {
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
||||
return (char*)(&(pEntry->index));
|
||||
}
|
||||
|
||||
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
||||
|
||||
void printSkipList(SSkipList* pSkipList) {
|
||||
ASSERT(pSkipList != NULL);
|
||||
|
||||
SSkipListIterator* pIter = tSkipListCreateIter(pSkipList);
|
||||
while (tSkipListIterNext(pIter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||
ASSERT(pNode != NULL);
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||
syncEntryPrint2((char*)"", pEntry);
|
||||
}
|
||||
}
|
||||
|
||||
void delSkipListFirst(SSkipList* pSkipList, int n) {
|
||||
ASSERT(pSkipList != NULL);
|
||||
|
||||
sTrace("delete first %d -------------", n);
|
||||
SSkipListIterator* pIter = tSkipListCreateIter(pSkipList);
|
||||
for (int i = 0; i < n; ++i) {
|
||||
tSkipListIterNext(pIter);
|
||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||
tSkipListRemoveNode(pSkipList, pNode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) {
|
||||
SyncIndex index2 = index;
|
||||
SSyncRaftEntry *pEntry = NULL;
|
||||
int arraySize = 0;
|
||||
|
||||
SArray* entryPArray = tSkipListGet(pSkipList, (char*)(&index2));
|
||||
arraySize = taosArrayGetSize(entryPArray);
|
||||
if (arraySize > 0) {
|
||||
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
||||
ASSERT(*ppNode != NULL);
|
||||
pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
||||
}
|
||||
taosArrayDestroy(entryPArray);
|
||||
|
||||
sTrace("get index2: %ld, arraySize:%d -------------", index, arraySize);
|
||||
syncEntryLog2((char*)"getLogEntry2", pEntry);
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
|
||||
SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) {
|
||||
sTrace("get index: %ld -------------", index);
|
||||
SyncIndex index2 = index;
|
||||
SSyncRaftEntry *pEntry = NULL;
|
||||
SSkipListIterator* pIter = tSkipListCreateIterFromVal(pSkipList, (const char *)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
if (tSkipListIterNext(pIter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||
ASSERT(pNode != NULL);
|
||||
pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||
}
|
||||
|
||||
syncEntryLog2((char*)"getLogEntry", pEntry);
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
void test5() {
|
||||
SSkipList* pSkipList =
|
||||
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
||||
ASSERT(pSkipList != NULL);
|
||||
|
||||
sTrace("insert 9 - 5");
|
||||
for (int i = 9; i >= 5; --i) {
|
||||
SSyncRaftEntry* pEntry = createEntry(i);
|
||||
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
||||
}
|
||||
|
||||
sTrace("insert 0 - 4");
|
||||
for (int i = 0; i <= 4; ++i) {
|
||||
SSyncRaftEntry* pEntry = createEntry(i);
|
||||
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
||||
}
|
||||
|
||||
sTrace("insert 7 7 7 7 7");
|
||||
for (int i = 0; i <= 4; ++i) {
|
||||
SSyncRaftEntry* pEntry = createEntry(7);
|
||||
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
||||
}
|
||||
|
||||
sTrace("print: -------------");
|
||||
printSkipList(pSkipList);
|
||||
|
||||
delSkipListFirst(pSkipList, 3);
|
||||
|
||||
sTrace("print: -------------");
|
||||
printSkipList(pSkipList);
|
||||
|
||||
getLogEntry(pSkipList, 2);
|
||||
getLogEntry(pSkipList, 5);
|
||||
getLogEntry(pSkipList, 7);
|
||||
getLogEntry(pSkipList, 7);
|
||||
|
||||
getLogEntry2(pSkipList, 2);
|
||||
getLogEntry2(pSkipList, 5);
|
||||
getLogEntry2(pSkipList, 7);
|
||||
getLogEntry2(pSkipList, 7);
|
||||
|
||||
|
||||
tSkipListDestroy(pSkipList);
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
gRaftDetailLog = true;
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
|
||||
|
||||
/*
|
||||
test1();
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
*/
|
||||
|
||||
test5();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -40,8 +40,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
|
||||
if (cbMeta.index > beginIndex) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
} else {
|
||||
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
|
||||
|
@ -51,15 +52,16 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
||||
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
}
|
||||
|
||||
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||
}
|
||||
|
||||
|
@ -143,7 +145,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
|
|||
pMsg->msgType = 9999;
|
||||
pMsg->contLen = 256;
|
||||
pMsg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs());
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count,
|
||||
taosGetTimestampMs());
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,8 +35,8 @@ void syncRespMgrDelTest(uint64_t begin, uint64_t end) {
|
|||
}
|
||||
|
||||
void printStub(SRespStub *p) {
|
||||
printf("createTime:%" PRId64 ", rpcMsg.code:%d rpcMsg.ahandle:%" PRId64 " rpcMsg.handle:%" PRId64 " \n", p->createTime, p->rpcMsg.code,
|
||||
(int64_t)(p->rpcMsg.info.ahandle), (int64_t)(p->rpcMsg.info.handle));
|
||||
printf("createTime:%" PRId64 ", rpcMsg.code:%d rpcMsg.ahandle:%" PRId64 " rpcMsg.handle:%" PRId64 " \n",
|
||||
p->createTime, p->rpcMsg.code, (int64_t)(p->rpcMsg.info.ahandle), (int64_t)(p->rpcMsg.info.handle));
|
||||
}
|
||||
void syncRespMgrPrint() {
|
||||
printf("\n----------------syncRespMgrPrint--------------\n");
|
||||
|
|
|
@ -43,8 +43,9 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
|||
|
||||
if (cbMeta.index > beginIndex) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
} else {
|
||||
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
|
||||
|
@ -54,15 +55,16 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
|||
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
||||
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,9 @@ void cleanup() { walCleanUp(); }
|
|||
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " "
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
|
||||
", term:%" PRIu64
|
||||
" "
|
||||
"currentTerm:%" PRIu64 " \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
|
||||
|
@ -50,7 +52,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
|||
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " "
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
|
||||
", term:%" PRIu64
|
||||
" "
|
||||
"currentTerm:%" PRIu64 " \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
|
||||
|
@ -60,7 +64,9 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta)
|
|||
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " "
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
|
||||
", term:%" PRIu64
|
||||
" "
|
||||
"currentTerm:%" PRIu64 " \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
|
||||
|
@ -128,7 +134,8 @@ int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) {
|
|||
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d, gSnapshotLastApplyIndex:%" PRId64 ", "
|
||||
"==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d, gSnapshotLastApplyIndex:%" PRId64
|
||||
", "
|
||||
"gSnapshotLastApplyTerm:%" PRId64,
|
||||
pFsm, pWriter, isApply, gSnapshotLastApplyIndex, gSnapshotLastApplyTerm);
|
||||
sTrace("%s", logBuf);
|
||||
|
@ -148,7 +155,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini
|
|||
|
||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
|
||||
char* s = syncCfg2Str(&(cbMeta.newCfg));
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 ", newCfg:%s",
|
||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64
|
||||
", newCfg:%s",
|
||||
cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
@ -156,7 +164,9 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe
|
|||
void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==LeaderTransferCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " "
|
||||
"==callback== ==LeaderTransferCb== pFsm:%p, index:%" PRId64
|
||||
", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64
|
||||
" "
|
||||
"currentTerm:%" PRIu64 " \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
|
||||
|
@ -300,7 +310,8 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
|
|||
pMsg->msgType = TDMT_VND_SUBMIT;
|
||||
pMsg->contLen = 256;
|
||||
pMsg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, taosGetTimestampMs());
|
||||
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count,
|
||||
taosGetTimestampMs());
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,23 +33,25 @@ const char *pDir = "./syncWriteTest";
|
|||
|
||||
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
||||
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
||||
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -141,34 +141,32 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
regfree(&idxRegPattern);
|
||||
|
||||
taosArraySort(pLogInfoArray, compareWalFileInfo);
|
||||
int oldSz = 0;
|
||||
if (pWal->fileInfoSet) {
|
||||
oldSz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
}
|
||||
int newSz = taosArrayGetSize(pLogInfoArray);
|
||||
|
||||
if (oldSz > newSz) {
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
|
||||
} else if (oldSz < newSz) {
|
||||
for (int i = oldSz; i < newSz; i++) {
|
||||
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
||||
int actualFileNum = taosArrayGetSize(pLogInfoArray);
|
||||
|
||||
if (metaFileNum > actualFileNum) {
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
|
||||
} else if (metaFileNum < actualFileNum) {
|
||||
for (int i = metaFileNum; i < actualFileNum; i++) {
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
|
||||
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pLogInfoArray);
|
||||
|
||||
pWal->writeCur = newSz - 1;
|
||||
if (newSz > 0) {
|
||||
pWal->writeCur = actualFileNum - 1;
|
||||
if (actualFileNum > 0) {
|
||||
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||
|
||||
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1);
|
||||
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
||||
int64_t file_size = 0;
|
||||
taosStatFile(fnameStr, &file_size, NULL);
|
||||
int64_t fileSize = 0;
|
||||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
|
||||
if (oldSz != newSz || pLastFileInfo->fileSize != file_size) {
|
||||
pLastFileInfo->fileSize = file_size;
|
||||
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
|
||||
pLastFileInfo->fileSize = fileSize;
|
||||
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
|
||||
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
|
||||
ASSERT(pWal->vers.lastVer != -1);
|
||||
|
|
|
@ -99,7 +99,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
|
||||
// delete files
|
||||
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||
for (int i = pWal->writeCur; i < fileSetSize; i++) {
|
||||
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
|
||||
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||
|
@ -113,18 +113,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
|
||||
if (pIdxTFile == NULL) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
// read idx file and get log file pos
|
||||
SWalIdxEntry entry;
|
||||
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
@ -133,12 +136,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
ASSERT(0);
|
||||
// TODO
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
// TODO
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
|
@ -148,6 +153,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
ASSERT(taosValidFile(pLogTFile));
|
||||
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
|
||||
if (size != sizeof(SWalCkHead)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
@ -205,15 +211,22 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
|||
pWal->vers.verInSnapshotting = ver;
|
||||
// check file rolling
|
||||
if (pWal->cfg.retentionPeriod == 0) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
walRoll(pWal);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walEndSnapshot(SWal *pWal) {
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
int64_t ver = pWal->vers.verInSnapshotting;
|
||||
if (ver == -1) return 0;
|
||||
if (ver == -1) {
|
||||
code = -1;
|
||||
goto END;
|
||||
};
|
||||
|
||||
pWal->vers.snapshotVer = ver;
|
||||
int ts = taosGetTimestampSec();
|
||||
|
@ -229,7 +242,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
}
|
||||
// iterate files, until the searched result
|
||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) ||
|
||||
if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) ||
|
||||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
|
||||
// delete according to file size or close time
|
||||
deleteCnt++;
|
||||
|
@ -259,12 +272,14 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
pWal->vers.verInSnapshotting = -1;
|
||||
|
||||
// save snapshot ver, commit ver
|
||||
int code = walSaveMeta(pWal);
|
||||
code = walSaveMeta(pWal);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
return 0;
|
||||
END:
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
int walRoll(SWal *pWal) {
|
||||
|
@ -273,14 +288,14 @@ int walRoll(SWal *pWal) {
|
|||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
if (pWal->pWriteLogTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
TdFilePtr pIdxTFile, pLogTFile;
|
||||
|
@ -291,18 +306,20 @@ int walRoll(SWal *pWal) {
|
|||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
// terrno set inner
|
||||
// error code was set inner
|
||||
code = walRollFileInfo(pWal);
|
||||
if (code != 0) {
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
// switch file
|
||||
|
@ -312,7 +329,9 @@ int walRoll(SWal *pWal) {
|
|||
ASSERT(pWal->writeCur >= 0);
|
||||
|
||||
pWal->lastRollSeq = walGetSeq();
|
||||
return 0;
|
||||
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
|
|
|
@ -164,8 +164,8 @@
|
|||
# --- sma
|
||||
./test.sh -f tsim/sma/drop_sma.sim
|
||||
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||
#./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
#./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
|
||||
# --- valgrind
|
||||
./test.sh -f tsim/valgrind/checkError1.sim
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
$loop_cnt = 0
|
||||
check_dnode_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
|
||||
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
|
||||
if $data[0][0] != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][4] != ready then
|
||||
goto check_dnode_ready
|
||||
endi
|
||||
|
||||
sql connect
|
||||
sql create dnode $hostname port 7200
|
||||
sql create dnode $hostname port 7300
|
||||
sql create dnode $hostname port 7400
|
||||
|
||||
$loop_cnt = 0
|
||||
check_dnode_ready_1:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 10 then
|
||||
print ====> dnodes not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
|
||||
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
|
||||
if $data[0][4] != ready then
|
||||
goto check_dnode_ready_1
|
||||
endi
|
||||
if $data[1][4] != ready then
|
||||
goto check_dnode_ready_1
|
||||
endi
|
||||
if $data[2][4] != ready then
|
||||
goto check_dnode_ready_1
|
||||
endi
|
||||
if $data[3][4] != ready then
|
||||
goto check_dnode_ready_1
|
||||
endi
|
||||
|
||||
$replica = 3
|
||||
$vgroups = 1
|
||||
|
||||
print ============= create database
|
||||
sql create database db replica $replica vgroups $vgroups
|
||||
|
||||
$loop_cnt = 0
|
||||
check_db_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 100 then
|
||||
print ====> db not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show databases
|
||||
print ===> rows: $rows
|
||||
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data[2][19] != ready then
|
||||
goto check_db_ready
|
||||
endi
|
||||
|
||||
sql use db
|
||||
|
||||
$loop_cnt = 0
|
||||
check_vg_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 300 then
|
||||
print ====> vgroups not ready!
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show vgroups
|
||||
print ===> rows: $rows
|
||||
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
|
||||
|
||||
if $rows != $vgroups then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][4] == leader then
|
||||
if $data[0][6] == follower then
|
||||
if $data[0][8] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
|
||||
endi
|
||||
endi
|
||||
elif $data[0][6] == leader then
|
||||
if $data[0][4] == follower then
|
||||
if $data[0][8] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
|
||||
endi
|
||||
endi
|
||||
elif $data[0][8] == leader then
|
||||
if $data[0][4] == follower then
|
||||
if $data[0][6] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
|
||||
endi
|
||||
endi
|
||||
else
|
||||
goto check_vg_ready
|
||||
endi
|
||||
|
||||
|
||||
vg_ready:
|
||||
print ====> create stable/child table
|
||||
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create table ct1 using stb tags(1000)
|
||||
|
||||
|
||||
print ===> stop dnode4
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
sleep 3000
|
||||
|
||||
|
||||
print ===> write 100 records
|
||||
$N = 100
|
||||
$count = 0
|
||||
while $count < $N
|
||||
$ms = 1591200000000 + $count
|
||||
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
|
||||
$count = $count + 1
|
||||
endw
|
||||
|
||||
|
||||
#sql flush database db;
|
||||
|
||||
|
||||
sleep 3000
|
||||
|
||||
|
||||
print ===> stop dnode1 dnode2 dnode3
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
|
||||
|
||||
|
||||
|
||||
print ===> start dnode1 dnode2 dnode3 dnode4
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
|
Loading…
Reference in New Issue