Merge branch 'enh/triggerCheckPoint2' of github.com:taosdata/tdengine into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-08-18 22:58:56 +08:00
commit 15af111f1e
5 changed files with 45 additions and 25 deletions

View File

@ -990,7 +990,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
// ASSERT(code == 0);
if (code == -1) {
// for history
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ", opNum:%d",
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
pKey->win.skey, pKey->win.ekey, pKey->groupId);
pGroupResInfo->index += 1;
continue;

View File

@ -437,7 +437,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
taosMkDir(state);
code = copyFiles(chkp, state);
if (code != 0) {
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
qInfo("succ to restart stream backend at checkpoint path: %s", chkp);
}
@ -457,7 +457,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
qDebug("start to init stream backend at %s", backendPath);
qDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId);
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
@ -481,7 +481,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
rocksdb_options_set_info_log_level(opts, 0);
rocksdb_options_set_info_log_level(opts, 1);
rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
@ -789,9 +789,19 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
return -1;
}
qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path);
qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, checkpointDir);
if (pHandle->db != NULL) {
char* err = NULL;
char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flush(pHandle->db, flushOpt, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
}
rocksdb_flushoptions_destroy(flushOpt);
rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err);
if (cp == NULL || err != NULL) {
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err);
@ -1361,7 +1371,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
inst->dbOpt = handle->dbOpt;
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
// rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else {
inst = *pInst;
@ -1482,7 +1492,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
// rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
@ -1655,7 +1665,6 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
} \
if (code == 0) qDebug("streamState str: %s succ to read from %s_%s", toString, wrapper->idstr, funcname); \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
@ -1987,6 +1996,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
}
taosMemoryFree(tmp);
streamStateFreeCur(pCur);
return code;
}
@ -2658,7 +2668,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
{
char tbuf[256] = {0};
ginitDict[i].toStrFunc((void*)key, tbuf);
qDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[i].key);
qDebug("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
}
return 0;
}
@ -2693,6 +2703,8 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
} else {
qDebug("write batch to backend opt: %p", wrapper->pBackend);
}
return 0;
}

View File

@ -446,7 +446,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal != NULL && vLen != 0) {
if (pVal == NULL || vLen == 0) {
break;
}
SCheckpointInfo info;
@ -458,7 +458,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
chkpId = TMAX(chkpId, info.checkpointId);
}
qDebug("get max chkp id: %" PRId64 "", chkpId);
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);

View File

@ -422,14 +422,18 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
int32_t code = 0;
void* batch = streamStateCreateBatch();
// void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
if (code != 0) {
return code;
}
code = streamStatePutBatch_rocksdb(pState, batch);
streamStateDestroyBatch(batch);
// code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
// if (code != 0) {
// return code;
// }
// code = streamStatePutBatch_rocksdb(pState, batch);
// streamStateDestroyBatch(batch);
code = streamDefaultPut_rocksdb(pState, pKey, pVal, vLen);
char* Val = NULL;
int32_t len = 0;
code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len);
return code;
#else
return 0;

View File

@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
#include "executor.h"
#include "streamInt.h"
#include "tmisce.h"
@ -140,6 +139,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
}
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t ver;
int64_t skip64;
int8_t skip8;
int32_t skip32;
@ -147,6 +147,10 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
@ -259,7 +263,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
// remove the ref by timer
while(pTask->status.timerActive > 0) {
while (pTask->status.timerActive > 0) {
qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr);
taosMsleep(10);
}
@ -414,7 +418,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
EPSET_TO_STR(pEpSet, buf);
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < numOfUpstream; ++i) {
for (int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet);
@ -486,7 +490,7 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask)
int64_t stage = pTask->pMeta->stage;
int32_t vgId = pTask->pMeta->vgId;
qDebug("s-task:%s vgId:%d restart current task, stage:%"PRId64", status:%s, sched-status:%d", id, vgId, stage,
qDebug("s-task:%s vgId:%d restart current task, stage:%" PRId64 ", status:%s, sched-status:%d", id, vgId, stage,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// 1. stop task
@ -544,7 +548,7 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
}
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
}
@ -557,7 +561,7 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
}
int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < size; ++i) {
for (int32_t i = 0; i < size; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
pInfo->stage = -1;
}