diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b4ae30910c..b241ae9b41 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -30,6 +30,7 @@ extern "C" { typedef struct SStreamTask SStreamTask; +#define SSTREAM_TASK_VER 1 enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -266,13 +267,13 @@ typedef struct SCheckpointInfo { } SCheckpointInfo; typedef struct SStreamStatus { - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t keepTaskStatus; - bool transferState; - int8_t timerActive; // timer is active - int8_t pauseAllowed; // allowed task status to be set to be paused + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t keepTaskStatus; + bool transferState; + int8_t timerActive; // timer is active + int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; typedef struct SHistDataRange { @@ -309,6 +310,7 @@ typedef struct { } STaskTimestamp; struct SStreamTask { + int64_t ver; SStreamId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; @@ -589,10 +591,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask); -SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); -int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); +SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); +int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); -char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history void streamTaskCheckDownstreamTasks(SStreamTask* pTask); @@ -628,7 +630,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); // agg level int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, + SRpcHandleInfo* pRpcInfo); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta @@ -642,7 +645,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); @@ -659,7 +662,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a8a719edda..3dab144eef 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -70,6 +70,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { SStreamTask *pTask = taosArrayGetP(pArray, j); + pTask->ver = SSTREAM_TASK_VER; if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; } } @@ -154,7 +155,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { return 0; } -static void* freeStreamTasks(SArray* pTaskLevel) { +static void *freeStreamTasks(SArray *pTaskLevel) { int32_t numOfLevel = taosArrayGetSize(pTaskLevel); for (int32_t i = 0; i < numOfLevel; i++) { SArray *pLevel = taosArrayGetP(pTaskLevel, i); @@ -192,14 +193,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; pVgEpNew->vgId = pVgEp->vgId; -// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); + // pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); pVgEpNew->epSet = pVgEp->epSet; return pVgEpNew; } void tDeleteSMqVgEp(SMqVgEp *pVgEp) { if (pVgEp) { -// taosMemoryFreeClear(pVgEp->qmsg); + // taosMemoryFreeClear(pVgEp->qmsg); taosMemoryFree(pVgEp); } } @@ -207,14 +208,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); -// tlen += taosEncodeString(buf, pVgEp->qmsg); + // tlen += taosEncodeString(buf, pVgEp->qmsg); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - if(sver == 1){ + if (sver == 1) { uint64_t size = 0; buf = taosDecodeVariantU64(buf, &size); buf = POINTER_SHIFT(buf, size); @@ -223,7 +224,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { return (void *)buf; } -SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) { +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) { SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); if (pConsumer == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -260,12 +261,12 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) { } void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { - if(pConsumer == NULL) return; + if (pConsumer == NULL) return; taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); - if(delete){ + if (delete) { taosMemoryFree(pConsumer); } } @@ -392,7 +393,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s taosArrayPush(pConsumer->assignedTopics, &topic); } - if(sver > 1){ + if (sver > 1) { buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); @@ -401,18 +402,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s return (void *)buf; } -//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { -// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); -// if (pConsumerEpNew == NULL) return NULL; -// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; -// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); -// return pConsumerEpNew; -//} +// SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { +// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); +// if (pConsumerEpNew == NULL) return NULL; +// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; +// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); +// return pConsumerEpNew; +// } // -//void tDeleteSMqConsumerEp(void *data) { -// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; -// taosArrayDestroy(pConsumerEp->vgs); -//} +// void tDeleteSMqConsumerEp(void *data) { +// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; +// taosArrayDestroy(pConsumerEp->vgs); +// } int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tlen = 0; @@ -420,7 +421,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows); tlen += taosEncodeFixedI32(buf, szVgs); - for (int32_t j= 0; j < szVgs; ++j) { + for (int32_t j = 0; j < szVgs; ++j) { OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI64(buf, offRows->rows); @@ -434,28 +435,28 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { // do nothing } } -//#if 0 -// int32_t sz = taosArrayGetSize(pConsumerEp->vgs); -// tlen += taosEncodeFixedI32(buf, sz); -// for (int32_t i = 0; i < sz; i++) { -// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); -// tlen += tEncodeSMqVgEp(buf, pVgEp); -// } -//#endif + // #if 0 + // int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + // tlen += taosEncodeFixedI32(buf, sz); + // for (int32_t i = 0; i < sz; i++) { + // SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); + // tlen += tEncodeSMqVgEp(buf, pVgEp); + // } + // #endif return tlen; } void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); - if (sver > 1){ + if (sver > 1) { int32_t szVgs = 0; buf = taosDecodeFixedI32(buf, &szVgs); - if(szVgs > 0){ + if (szVgs > 0) { pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); if (NULL == pConsumerEp->offsetRows) return NULL; - for (int32_t j= 0; j < szVgs; ++j) { - OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); + for (int32_t j = 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI8(buf, &offRows->offset.type); @@ -470,21 +471,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s } } } -//#if 0 -// int32_t sz; -// buf = taosDecodeFixedI32(buf, &sz); -// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); -// for (int32_t i = 0; i < sz; i++) { -// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); -// buf = tDecodeSMqVgEp(buf, pVgEp); -// taosArrayPush(pConsumerEp->vgs, &pVgEp); -// } -//#endif + // #if 0 + // int32_t sz; + // buf = taosDecodeFixedI32(buf, &sz); + // pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); + // for (int32_t i = 0; i < sz; i++) { + // SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); + // buf = tDecodeSMqVgEp(buf, pVgEp); + // taosArrayPush(pConsumerEp->vgs, &pVgEp); + // } + // #endif return (void *)buf; } -SMqSubscribeObj *tNewSubscribeObj(const char* key) { +SMqSubscribeObj *tNewSubscribeObj(const char *key) { SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); if (pSubObj == NULL) { return NULL; @@ -577,7 +578,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { int32_t szVgs = taosArrayGetSize(pSub->offsetRows); tlen += taosEncodeFixedI32(buf, szVgs); - for (int32_t j= 0; j < szVgs; ++j) { + for (int32_t j = 0; j < szVgs; ++j) { OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j); tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI64(buf, offRows->rows); @@ -617,14 +618,14 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeStringTo(buf, pSub->dbName); - if (sver > 1){ + if (sver > 1) { int32_t szVgs = 0; buf = taosDecodeFixedI32(buf, &szVgs); - if(szVgs > 0){ + if (szVgs > 0) { pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); if (NULL == pSub->offsetRows) return NULL; - for (int32_t j= 0; j < szVgs; ++j) { - OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1); + for (int32_t j = 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1); buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI8(buf, &offRows->offset.type); @@ -639,71 +640,71 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { } } buf = taosDecodeString(buf, &pSub->qmsg); - }else{ + } else { pSub->qmsg = taosStrdup(""); } return (void *)buf; } -//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { -// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); -// if (pEntryNew == NULL) return NULL; -// pEntryNew->epoch = pEntry->epoch; -// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); -// return pEntryNew; -//} +// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { +// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); +// if (pEntryNew == NULL) return NULL; +// pEntryNew->epoch = pEntry->epoch; +// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); +// return pEntryNew; +// } // -//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { -// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); -//} +// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { +// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); +// } -//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { -// int32_t tlen = 0; -// tlen += taosEncodeFixedI32(buf, pEntry->epoch); -// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); -// return tlen; -//} +// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { +// int32_t tlen = 0; +// tlen += taosEncodeFixedI32(buf, pEntry->epoch); +// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); +// return tlen; +// } // -//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { -// buf = taosDecodeFixedI32(buf, &pEntry->epoch); -// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); -// return (void *)buf; -//} +// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { +// buf = taosDecodeFixedI32(buf, &pEntry->epoch); +// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); +// return (void *)buf; +// } -//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { -// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); -// if (pLogNew == NULL) return pLogNew; -// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); -// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); -// return pLogNew; -//} +// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { +// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); +// if (pLogNew == NULL) return pLogNew; +// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); +// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); +// return pLogNew; +// } // -//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { -// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); -//} +// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { +// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); +// } -//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { -// int32_t tlen = 0; -// tlen += taosEncodeString(buf, pLog->key); -// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); -// return tlen; -//} +// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { +// int32_t tlen = 0; +// tlen += taosEncodeString(buf, pLog->key); +// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); +// return tlen; +// } // -//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { -// buf = taosDecodeStringTo(buf, pLog->key); -// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); -// return (void *)buf; -//} +// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { +// buf = taosDecodeStringTo(buf, pLog->key); +// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); +// return (void *)buf; +// } // -//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { -// int32_t tlen = 0; -// tlen += taosEncodeString(buf, pOffset->key); -// tlen += taosEncodeFixedI64(buf, pOffset->offset); -// return tlen; -//} +// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { +// int32_t tlen = 0; +// tlen += taosEncodeString(buf, pOffset->key); +// tlen += taosEncodeFixedI64(buf, pOffset->offset); +// return tlen; +// } // -//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { -// buf = taosDecodeStringTo(buf, pOffset->key); -// buf = taosDecodeFixedI64(buf, &pOffset->offset); -// return buf; -//} +// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { +// buf = taosDecodeStringTo(buf, pOffset->key); +// buf = taosDecodeFixedI64(buf, &pOffset->offset); +// return buf; +// } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cf1d445149..dd53f47964 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -140,7 +140,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { void *buf = NULL; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) { + mError("stream read invalid data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream manually", + tsDataDir); + goto STREAM_DECODE_OVER; + } if (sver != MND_STREAM_VER_NUMBER) { terrno = 0; @@ -429,9 +433,11 @@ FAIL: return 0; } -int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { +int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { SEncoder encoder; tEncoderInit(&encoder, NULL, 0); + + pTask->ver = SSTREAM_TASK_VER; tEncodeStreamTask(&encoder, pTask); int32_t size = encoder.pos; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8534f3b0a1..4a0ce81e68 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -218,11 +218,11 @@ _EXIT: } void streamBackendCleanup(void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)arg; - RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); + void* pIter = taosHashIterate(pHandle->cfInst, NULL); while (pIter != NULL) { - RocksdbCfInst* inst = *pIter; + RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; destroyRocksdbCfInst(inst); - taosHashIterate(pHandle->cfInst, pIter); + pIter = taosHashIterate(pHandle->cfInst, pIter); } taosHashCleanup(pHandle->cfInst); @@ -833,7 +833,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t qDebug("succ to open rocksdb cf"); } // close default cf - if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]); + if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) { + rocksdb_column_family_handle_destroy(cfHandle[0]); + cfHandle[0] = NULL; + } rocksdb_options_destroy(cfOpts[0]); handle->db = db; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ae07738868..3a60dcdb80 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -202,6 +202,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; int32_t code; + pTask->ver = SSTREAM_TASK_VER; tEncodeSize(tEncodeStreamTask, pTask, len, code); if (code < 0) { return -1; @@ -331,7 +332,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); - while(1) { + while (1) { taosRLockLatch(&pMeta->lock); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); @@ -443,10 +444,16 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { taosArrayDestroy(pRecycleList); return -1; } - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeStreamTask(&decoder, pTask); - tDecoderClear(&decoder); + if (tDecodeStreamTask(&decoder, pTask) < 0) { + tDecoderClear(&decoder); + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + taosArrayDestroy(pRecycleList); + tFreeStreamTask(pTask); + return -1; + } if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { int32_t taskId = pTask->id.taskId; @@ -500,13 +507,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } if (taosArrayGetSize(pRecycleList) > 0) { - for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { - int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i); + for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pRecycleList, i); streamMetaRemoveTask(pMeta, taskId); } } - qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t) taosArrayGetSize(pMeta->pTaskList)); + qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t)taosArrayGetSize(pMeta->pTaskList)); taosArrayDestroy(pRecycleList); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1eb8d11916..dc4e5ff4a6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -26,13 +26,14 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { return 0; } -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) { +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, + SArray* pTaskList) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - + pTask->ver = SSTREAM_TASK_VER; pTask->id.taskId = tGenIdPI32(); pTask->id.streamId = streamId; pTask->info.taskLevel = taskLevel; @@ -72,6 +73,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; @@ -135,6 +137,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; + if (pTask->ver != SSTREAM_TASK_VER) return -1; + if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; @@ -163,7 +168,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; - int32_t epSz; + int32_t epSz = -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);