diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d090ed5e3e..b40ab39deb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -437,7 +437,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { taosMkDir(state); code = copyFiles(chkp, state); if (code != 0) { - qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { qInfo("succ to restart stream backend at checkpoint path: %s", chkp); } @@ -457,7 +457,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); - qDebug("start to init stream backend at %s", backendPath); + qDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId); uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); @@ -481,7 +481,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 3); - rocksdb_options_set_info_log_level(opts, 0); + rocksdb_options_set_info_log_level(opts, 1); rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); @@ -789,9 +789,19 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { return -1; } - qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path); + qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, checkpointDir); + if (pHandle->db != NULL) { - char* err = NULL; + char* err = NULL; + + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flush(pHandle->db, flushOpt, &err); + if (err != NULL) { + qError("failed to flush db before streamBackend clean up, reason:%s", err); + taosMemoryFree(err); + } + rocksdb_flushoptions_destroy(flushOpt); + rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); if (cp == NULL || err != NULL) { qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); @@ -1361,7 +1371,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1482,7 +1492,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + // rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); @@ -1655,7 +1665,6 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(val); \ if (vLen != NULL) *vLen = tlen; \ } \ - if (code == 0) qDebug("streamState str: %s succ to read from %s_%s", toString, wrapper->idstr, funcname); \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -2659,7 +2668,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb { char tbuf[256] = {0}; ginitDict[i].toStrFunc((void*)key, tbuf); - qDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[i].key); + qDebug("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen); } return 0; } @@ -2694,6 +2703,8 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { qError("streamState failed to write batch, err:%s", err); taosMemoryFree(err); return -1; + } else { + qDebug("write batch to backend opt: %p", wrapper->pBackend); } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 69b4c39cf3..072b74cd9a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -446,7 +446,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - if (pVal != NULL && vLen != 0) { + if (pVal == NULL || vLen == 0) { break; } SCheckpointInfo info; @@ -458,7 +458,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { chkpId = TMAX(chkpId, info.checkpointId); } - + qDebug("get max chkp id: %" PRId64 "", chkpId); tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 9e61e8aaca..1396b627c2 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -422,14 +422,18 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) { #ifdef USE_ROCKSDB int32_t code = 0; - void* batch = streamStateCreateBatch(); + // void* batch = streamStateCreateBatch(); - code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0); - if (code != 0) { - return code; - } - code = streamStatePutBatch_rocksdb(pState, batch); - streamStateDestroyBatch(batch); + // code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0); + // if (code != 0) { + // return code; + // } + // code = streamStatePutBatch_rocksdb(pState, batch); + // streamStateDestroyBatch(batch); + code = streamDefaultPut_rocksdb(pState, pKey, pVal, vLen); + char* Val = NULL; + int32_t len = 0; + code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len); return code; #else return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 888a08740c..65a836728b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "tstream.h" #include "executor.h" #include "streamInt.h" #include "tmisce.h" @@ -140,6 +139,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { + int64_t ver; int64_t skip64; int8_t skip8; int32_t skip32; @@ -147,6 +147,10 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) SEpSet epSet; if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &ver) < 0) return -1; + + if (ver != SSTREAM_TASK_VER) return -1; + if (tDecodeI64(pDecoder, &skip64) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1; @@ -259,7 +263,7 @@ void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState); // remove the ref by timer - while(pTask->status.timerActive > 0) { + while (pTask->status.timerActive > 0) { qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr); taosMsleep(10); } @@ -414,7 +418,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS EPSET_TO_STR(pEpSet, buf); int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); - for(int32_t i = 0; i < numOfUpstream; ++i) { + for (int32_t i = 0; i < numOfUpstream; ++i) { SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); @@ -486,7 +490,7 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) int64_t stage = pTask->pMeta->stage; int32_t vgId = pTask->pMeta->vgId; - qDebug("s-task:%s vgId:%d restart current task, stage:%"PRId64", status:%s, sched-status:%d", id, vgId, stage, + qDebug("s-task:%s vgId:%d restart current task, stage:%" PRId64 ", status:%s, sched-status:%d", id, vgId, stage, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); // 1. stop task @@ -544,7 +548,7 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { } int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { - for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); } @@ -557,7 +561,7 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { } int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList); - for(int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < size; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); pInfo->stage = -1; }