diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index a4dead7475..5436442284 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -16,6 +16,8 @@ #ifndef TDENGINE_STREAMMSG_H #define TDENGINE_STREAMMSG_H +#include "tmsg.h" + #ifdef __cplusplus extern "C" { #endif @@ -45,6 +47,9 @@ typedef struct { int64_t expireTime; } SStreamCheckpointSourceReq; +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); + typedef struct { int64_t streamId; int64_t checkpointId; @@ -55,9 +60,6 @@ typedef struct { int8_t success; } SStreamCheckpointSourceRsp; -int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); -int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); - int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); typedef struct SStreamTaskNodeUpdateMsg { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 87b72bdead..9223aa3c7a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,15 +60,15 @@ int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; int32_t tsNumOfCommitThreads = 2; -int32_t tsNumOfTaskQueueThreads = 10; -int32_t tsNumOfMnodeQueryThreads = 4; +int32_t tsNumOfTaskQueueThreads = 16; +int32_t tsNumOfMnodeQueryThreads = 16; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; -int32_t tsNumOfVnodeQueryThreads = 4; +int32_t tsNumOfVnodeQueryThreads = 16; float tsRatioOfVnodeStreamThreads = 0.5F; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; -int32_t tsNumOfQnodeQueryThreads = 4; +int32_t tsNumOfQnodeQueryThreads = 16; int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; @@ -554,7 +554,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores; - tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 10); + tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16); if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; @@ -645,7 +645,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { return -1; tsNumOfVnodeQueryThreads = tsNumOfCores * 2; - tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); + tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 16); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -666,7 +666,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { return -1; tsNumOfQnodeQueryThreads = tsNumOfCores * 2; - tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); + tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 16); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -918,7 +918,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(tsCfg, "numOfVnodeQueryThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfVnodeQueryThreads = numOfCores * 2; - tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); + tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 16); pItem->i32 = tsNumOfVnodeQueryThreads; pItem->stype = stype; } @@ -948,7 +948,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(tsCfg, "numOfQnodeQueryThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfQnodeQueryThreads = numOfCores * 2; - tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); + tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 16); pItem->i32 = tsNumOfQnodeQueryThreads; pItem->stype = stype; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5f90618d6d..c578b95c1a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -895,7 +895,6 @@ typedef enum { } EExecMode; typedef struct { - int64_t version; SRowKey rowKey; int8_t dirty; SColVal colVal; @@ -909,20 +908,20 @@ typedef struct { uint32_t nData; }; }; -} SValueV1; +} SValueV0; typedef struct { int16_t cid; int8_t type; int8_t flag; - SValueV1 value; -} SColValV1; + SValueV0 value; +} SColValV0; typedef struct { TSKEY ts; int8_t dirty; - SColValV1 colVal; -} SLastColV1; + SColValV0 colVal; +} SLastColV0; int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index cc9d06b550..7556f993aa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -130,18 +130,19 @@ static void tsdbClosePgCache(STsdb *pTsdb) { enum { LFLAG_LAST_ROW = 0, LFLAG_LAST = 1, + LFLAG_VERSION = 1 << 2, + LFLAG_VERSION_BITS = (1 << 2 | 1 << 3), LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK, }; +#define LAST_KEY_HAS_VERSION ((k).lflag & LFLAG_VERSION_BITS) + typedef struct { tb_uid_t uid; int16_t cid; int8_t lflag; } SLastKey; -#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63) -#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2) - #define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY) #define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW) #define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST) @@ -180,9 +181,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t return 1; } - if (lhs->lflag < rhs->lflag) { + if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) { return -1; - } else if (lhs->lflag > rhs->lflag) { + } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) { return 1; } @@ -336,40 +337,39 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { } // note: new object do not own colVal's resource, just copy the pointer -static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) { +static SLastCol *tsdbCacheConvertLastColV0(SLastColV0 *pLastColV0) { SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); if (pLastCol == NULL) return NULL; - pLastCol->version = LAST_COL_VERSION; - pLastCol->rowKey.ts = pLastColV1->ts; + pLastCol->rowKey.ts = pLastColV0->ts; pLastCol->rowKey.numOfPKs = 0; - pLastCol->dirty = pLastColV1->dirty; - pLastCol->colVal.cid = pLastColV1->colVal.cid; - pLastCol->colVal.flag = pLastColV1->colVal.flag; - pLastCol->colVal.value.type = pLastColV1->colVal.type; - pLastCol->colVal.value.val = pLastColV1->colVal.value.val; + pLastCol->dirty = pLastColV0->dirty; + pLastCol->colVal.cid = pLastColV0->colVal.cid; + pLastCol->colVal.flag = pLastColV0->colVal.flag; + pLastCol->colVal.value.type = pLastColV0->colVal.type; + pLastCol->colVal.value.val = pLastColV0->colVal.value.val; return pLastCol; } -static SLastCol *tsdbCacheDeserializeV1(char const *value) { +static SLastCol *tsdbCacheDeserializeV0(char const *value) { if (!value) { return NULL; } - SLastColV1 *pLastColV1 = (SLastColV1 *)value; - SColValV1 *pColVal = &pLastColV1->colVal; + SLastColV0 *pLastColV0 = (SLastColV0 *)value; + SColValV0 *pColVal = &pLastColV0->colVal; if (IS_VAR_DATA_TYPE(pColVal->type)) { if (pColVal->value.nData > 0) { - pColVal->value.pData = (char *)value + sizeof(*pLastColV1); + pColVal->value.pData = (char *)value + sizeof(*pLastColV0); } else { pColVal->value.pData = NULL; } } - return tsdbCacheConvertLastColV1(pLastColV1); + return tsdbCacheConvertLastColV0(pLastColV0); } -static SLastCol *tsdbCacheDeserializeV2(char const *value) { +static SLastCol *tsdbCacheDeserializeV1(char const *value) { if (!value) { return NULL; } @@ -403,16 +403,26 @@ static SLastCol *tsdbCacheDeserializeV2(char const *value) { return pLastCol; } -static SLastCol *tsdbCacheDeserialize(char const *value) { +static SLastCol *tsdbCacheDeserialize(char const *value, int8_t lflag) { if (!value) { return NULL; } - bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE; - if (!hasVersion) { - return tsdbCacheDeserializeV1(value); + int8_t version = lflag & LFLAG_VERSION_BITS; + + SLastCol *lastCol = NULL; + switch (version) { + case 0: + lastCol = tsdbCacheDeserializeV0(value); + break; + case LFLAG_VERSION: + lastCol = tsdbCacheDeserializeV1(value); + break; + defalut: + tsdbError("invalid last key version %" PRId8 " , lflag:%" PRId8, version, lflag); + break; } - return tsdbCacheDeserializeV2(value); + return lastCol; } static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { @@ -451,7 +461,7 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { SValue *pToValue = &pToLastCol->rowKey.pks[i]; pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); - } + } } // copy var data value @@ -571,8 +581,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i SRowKey noneRowKey = {0}; noneRowKey.ts = TSKEY_MIN; noneRowKey.numOfPKs = 0; - SLastCol noneCol = { - .version = LAST_COL_VERSION, .rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; + SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; SLastCol *pLastCol = &noneCol; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); @@ -594,7 +603,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i charge += pLastCol->colVal.value.nData; } - SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; + SLastKey *pLastKey = &(SLastKey){.lflag = lflag | LFLAG_VERSION, .uid = uid, .cid = cid}; LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); if (status != TAOS_LRU_STATUS_OK) { @@ -647,8 +656,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid}; keys_list[0] = keys; keys_list[1] = keys + sizeof(SLastKey); @@ -672,13 +681,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], ((SLastKey*)keys_list[0])->lflag); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[0], klen); } taosMemoryFreeClear(pLastCol); - pLastCol = tsdbCacheDeserialize(values_list[1]); + pLastCol = tsdbCacheDeserialize(values_list[1], ((SLastKey*)keys_list[1])->lflag); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[1], klen); } @@ -935,7 +944,6 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal int nData = 0; // update rowkey - pLastCol->version = LAST_COL_VERSION; pLastCol->rowKey.ts = pRowKey->ts; pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs; for (int8_t i = 0; i < pRowKey->numOfPKs; i++) { @@ -1023,7 +1031,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; + SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { @@ -1041,7 +1049,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } if (COL_VAL_IS_VALUE(pColVal)) { - key->lflag = lflag | LFLAG_LAST; + key->lflag = lflag | LFLAG_LAST | LFLAG_VERSION; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); @@ -1079,9 +1087,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_free(errs[i]); } taosMemoryFree(errs); - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list_sizes); rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { @@ -1089,18 +1094,21 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag); SLastCol *PToFree = pLastCol; if (IS_LAST_ROW_KEY(idxKey->key)) { - int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); + int32_t cmp_res = 1; + if (pLastCol) { + cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); + } + if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value, + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); - // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); @@ -1139,10 +1147,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value, - &vlen); + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); - // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); @@ -1185,7 +1191,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksMayWrite(pTsdb, true, false, true); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); taosArrayDestroy(remainCols); } @@ -1392,7 +1401,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr SIdxKey *idxKey = taosArrayGet(remainCols, 0); if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { - SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; + SLastKey *key = &(SLastKey){.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); } @@ -1461,8 +1470,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } // still null, then make up a none col value - SLastCol noneCol = {.version = LAST_COL_VERSION, - .rowKey.ts = TSKEY_MIN, + SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; if (!pLastCol) { pLastCol = &noneCol; @@ -1559,14 +1567,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA rocksdb_free(errs[i]); } } - taosMemoryFree(key_list); - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(errs); SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag); SLastCol *PToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; if (pLastCol) { @@ -1608,6 +1612,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA } } + taosMemoryFree(errs); + taosMemoryFree(key_list); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); @@ -1629,7 +1637,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache for (int i = 0; i < num_keys; ++i) { int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; - SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid}; + SLastKey key = {.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = cid}; // for select last_row, last case int32_t funcType = FUNCTION_TYPE_CACHE_LAST; if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { @@ -1653,8 +1661,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache taosLRUCacheRelease(pCache, h, false); } else { - SLastCol noneCol = {.version = LAST_COL_VERSION, - .rowKey.ts = TSKEY_MIN, + SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; taosArrayPush(pLastArray, &noneCol); @@ -1728,8 +1735,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE int16_t cid = pTSchema->columns[i].colId; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid}; keys_list[i] = keys; keys_list[num_keys + i] = keys + sizeof(SLastKey); @@ -1759,14 +1766,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey *)keys_list[i])->lflag); taosThreadMutexLock(&pTsdb->rCache.rMutex); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } taosMemoryFreeClear(pLastCol); - pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); + pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], ((SLastKey *)keys_list[i + num_keys])->lflag); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } @@ -3292,8 +3299,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, for (int32_t i = 0; i < nCols; ++i) { int16_t slotId = slotIds[i]; - SLastCol col = {.version = LAST_COL_VERSION, - .rowKey.ts = 0, + SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; taosArrayPush(pColArray, &col); } @@ -3399,12 +3405,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}); + taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); @@ -3454,7 +3460,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { - SLastCol lastCol = {.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; + SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); @@ -3578,12 +3584,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}); + taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b53989e59e..38d3fa8e96 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4320,6 +4320,7 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; @@ -4339,6 +4340,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { return code; } } + tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index cd1a858175..daac98bbfc 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -38,11 +38,11 @@ typedef struct SSortMemFileRegion { int32_t bufRegOffset; int32_t bufLen; - char* buf; + char* buf; } SSortMemFileRegion; typedef struct SSortMemFile { - char* writeBuf; + char* writeBuf; int32_t writeBufSize; int64_t writeFileOffset; @@ -55,7 +55,7 @@ typedef struct SSortMemFile { int32_t blockSize; FILE* pTdFile; - char memFilePath[PATH_MAX]; + char memFilePath[PATH_MAX]; } SSortMemFile; struct SSortHandle { @@ -260,6 +260,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.cmpGroupId = false; pSortHandle->cmpParam.sortType = type; + if (type == SORT_BLOCK_TS_MERGE) { SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0); pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId; @@ -522,10 +523,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); - bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); + bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); if (isNull) { colDataSetVal(pColInfo, pBlock->info.rows, NULL, true); } else { @@ -557,7 +557,9 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); } else { - if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex); + if (pSource->pageIndex % 512 == 0) { + qDebug("begin source %p page %d", pSource, pSource->pageIndex); + } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -635,7 +637,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa // TODO: improve this function performance -int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, +int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) { SBlockOrderInfo* pOrder = pCompareOrder; SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); @@ -680,7 +682,7 @@ int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, left1 = colDataGetData(pLeftColInfoData, leftRowIndex); right1 = colDataGetData(pRightColInfoData, rightRowIndex); __compar_fn_t fn = pOrder->compFn; - int ret = fn(left1, right1); + int32_t ret = fn(left1, right1); return ret; } @@ -719,7 +721,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex; int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex; - int ret = pParam->cmpTsFn(leftTs, rightTs); + int32_t ret = pParam->cmpTsFn(leftTs, rightTs); if (ret == 0 && pParam->pPkOrder) { ret = tsortComparBlockCell(pLeftBlock, pRightBlock, pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder); @@ -782,7 +784,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { pOrder->compFn = fn; } - int ret = fn(left1, right1); + int32_t ret = fn(left1, right1); if (ret == 0) { continue; } else { @@ -855,7 +857,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return code; } - int nMergedRows = 0; + int32_t nMergedRows = 0; SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { @@ -1075,7 +1077,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i } taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); - int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); if (ret != 1) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -1095,7 +1097,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock); taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); - int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); if (ret != 1) { taosMemoryFreeClear(*ppRow); terrno = TAOS_SYSTEM_ERROR(errno); @@ -1214,7 +1216,7 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) { pRegion->regionSize = pMemFile->currRegionOffset; int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); if (writeBytes > 0) { - int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); + int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); if (ret != 1) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -1241,13 +1243,15 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { +static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, + int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); - int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); + int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); if (ret != 1) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -1255,11 +1259,13 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset; } } + *pRegionId = pMemFile->currRegionId; *pOffset = pMemFile->currRegionOffset; int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset); *pLength = blockLen; + pMemFile->currRegionOffset += blockLen; pMemFile->bRegionDirty = true; return TSDB_CODE_SUCCESS; @@ -1317,27 +1323,30 @@ static void initRowIdSort(SSortHandle* pHandle) { blockDataAppendColInfo(pSortInput, &offsetCol); SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); blockDataAppendColInfo(pSortInput, &lengthCol); + if (pHandle->bSortPk) { pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); blockDataAppendColInfo(pSortInput, &pkCol); } + blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; - int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); - size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); +// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); +// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); pHandle->pageSize = 256 * 1024; // 256k pHandle->numOfPages = 256; - SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo)); + SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); + + int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order; - SBlockOrderInfo* pTsOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlockOrderInfo biTs = {0}; - biTs.order = pTsOrder->order; + biTs.order = tsOrder; biTs.slotId = 0; biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC); biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); - taosArrayPush(aOrder, &biTs); + taosArrayPush(pOrderInfoList, &biTs); if (pHandle->bSortPk) { SBlockOrderInfo biPk = {0}; @@ -1345,11 +1354,11 @@ static void initRowIdSort(SSortHandle* pHandle) { biPk.slotId = 4; biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC); biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order); - taosArrayPush(aOrder, &biPk); + taosArrayPush(pOrderInfoList, &biPk); } + taosArrayDestroy(pHandle->pSortInfo); - pHandle->pSortInfo = aOrder; - return; + pHandle->pSortInfo = pOrderInfoList; } int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) { @@ -1440,128 +1449,224 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, return 0; } -static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { - int sz = 0; - int numCols = taosArrayGetSize(blk->pDataBlock); - if (!blk->info.hasVarCol) { - sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); - sz += blockDataGetRowSize(blk); +static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) { + int32_t size = 0; + int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock); + + if (!pSrcBlock->info.hasVarCol) { + size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); + size += blockDataGetRowSize(pSrcBlock); } else { for (int32_t i = 0; i < numCols; ++i) { - SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i); + SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) { - char* p = colDataGetData(pColInfoData, row); - sz += varDataTLen(p); + if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) { + char* p = colDataGetData(pColInfoData, srcRowIndex); + + if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { + size += getJsonValueLen(p); + } else { + size += varDataTLen(p); + } } - sz += sizeof(pColInfoData->varmeta.offset[0]); + size += sizeof(pColInfoData->varmeta.offset[0]); } else { - sz += pColInfoData->info.bytes; + size += pColInfoData->info.bytes; - if (((rowIdxInPage) & 0x07) == 0) { - sz += 1; // bitmap + if (((dstRowIndex) & 0x07) == 0) { + size += 1; // bitmap } } } } - return sz; + + return size; +} + +static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, + SColumnInfoData* pPkCol) { + int32_t size = 0; + int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); + + if (pPkCol == NULL) { // no var column + ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol)); + + size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); + size += blockDataGetRowSize(pDstBlock); + } else { + ASSERT(numOfCols == 5); + + size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0); + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i); + size += pColInfo->info.bytes; + } + + // handle the pk column, the last column, may be the var char column + if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { + if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { + char* p = colDataGetData(pPkCol, srcRowIndex); + size += varDataTLen(p); + } + + size += sizeof(pPkCol->varmeta.offset[0]); + } else { + size += pPkCol->info.bytes; + if (((dstRowIndex) & 0x07) == 0) { + size += 1; // bitmap + } + } + } + + return size; +} + +static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock, + int32_t srcRowIndex) { + int32_t inc = 0; + + if (pHandle->bSortByRowId) { + SColumnInfoData* pPkCol = NULL; + + // there may be varchar column exists, so we need to get the pk info, and then calculate the row length + if (pHandle->bSortPk) { + SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId); + } + + inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol); + } else { + inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex); + } + + return inc; +} + +static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) { + memset(pSup, 0, sizeof(SBlkMergeSupport)); + + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + + pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t)); + pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*)); + pSup->tsOrder = tsOrder; + pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*)); + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pBlock = taosArrayGetP(pBlockList, i); + SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId); + pSup->aTs[i] = (int64_t*)col->pData; + pSup->aRowIdx[i] = 0; + pSup->aBlks[i] = pBlock; + } + + pSup->pPkOrder = pPkOrderInfo; + return TSDB_CODE_SUCCESS; +} + +static void cleanupMergeSup(SBlkMergeSupport* pSup) { + taosMemoryFree(pSup->aRowIdx); + taosMemoryFree(pSup->aTs); + taosMemoryFree(pSup->aBlks); +} + +static int32_t getTotalRows(SArray* pBlockList) { + int32_t totalRows = 0; + + for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) { + SSDataBlock* blk = taosArrayGetP(pBlockList, i); + totalRows += blk->info.rows; + } + + return totalRows; } static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; - int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); - int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); + int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock); + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize); + blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataCleanup(pHandle->pDataBlock); - int32_t numBlks = taosArrayGetSize(aBlk); - SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? - taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); - SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlkMergeSupport sup = {0}; - sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); - sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); - sup.tsOrder = pOrigBlockTsOrder->order; - sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*)); - for (int i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); - SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId); - sup.aTs[i] = (int64_t*)col->pData; - sup.aRowIdx[i] = 0; - sup.aBlks[i] = blk; - } + + SBlockOrderInfo* pOrigBlockTsOrder = + (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); + + SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); + SBlockOrderInfo* pOrigBlockPkOrder = NULL; if (pHandle->bSortPk) { - pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? - taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); + pOrigBlockPkOrder = + (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); } - sup.pPkOrder = pOrigBlockPkOrder; - int32_t totalRows = 0; - for (int i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); - totalRows += blk->info.rows; - } + initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder); + + int32_t totalRows = getTotalRows(aBlk); SMultiwayMergeTreeInfo* pTree = NULL; - __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; + __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; + code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn); if (TSDB_CODE_SUCCESS != code) { - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); int32_t nRows = 0; int32_t nMergedRows = 0; - bool mergeLimitReached = false; - size_t blkPgSz = pgHeaderSz; + bool mergeLimitReached = false; + size_t blkPgSz = pageHeaderSize; int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; - int64_t currTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; + while (nRows < totalRows) { - int32_t minIdx = tMergeTreeGetChosenIndex(pTree); + int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); - int32_t minRow = sup.aRowIdx[minIdx]; - SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); + int32_t minRow = sup.aRowIdx[minIdx]; + + int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow); if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { - SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); - lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; - code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pTree); - taosArrayDestroy(aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); - return code; + SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); + lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; + code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pTree); + taosArrayDestroy(aPgId); + cleanupMergeSup(&sup); + return code; + } + + nMergedRows += pHandle->pDataBlock->info.rows; + blockDataCleanup(pHandle->pDataBlock); + blkPgSz = pageHeaderSize; + + bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow); + + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + mergeLimitReached = true; + if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || + (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { + pHandle->currMergeLimitTs = lastPageBufTs; } - nMergedRows += pHandle->pDataBlock->info.rows; - blockDataCleanup(pHandle->pDataBlock); - blkPgSz = pgHeaderSz; - incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - bufInc = getPageBufIncForRow(incBlock, minRow, 0); - - if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { - mergeLimitReached = true; - if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || - (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { - pHandle->currMergeLimitTs = lastPageBufTs; - } - break; - } + + break; + } } + blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); - if (!pHandle->bSortByRowId) { - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + if (pHandle->bSortByRowId) { + appendToRowIndexDataBlock(pHandle, minBlk, &minRow); } else { - appendToRowIndexDataBlock(pHandle, minBlk, &minRow); + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); } + blkPgSz += bufInc; + ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize); ++nRows; @@ -1572,6 +1677,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); } + if (pHandle->pDataBlock->info.rows > 0) { if (!mergeLimitReached) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); @@ -1580,18 +1686,16 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(aPgId); taosMemoryFree(pTree); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; - } + } nMergedRows += pHandle->pDataBlock->info.rows; if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { - mergeLimitReached = true; - if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || - (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { - pHandle->currMergeLimitTs = lastPageBufTs; - } + mergeLimitReached = true; + if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || + (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { + pHandle->currMergeLimitTs = lastPageBufTs; + } } } blockDataCleanup(pHandle->pDataBlock); @@ -1600,10 +1704,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); - + cleanupMergeSup(&sup); tMergeTreeDestroy(&pTree); return 0; @@ -1724,7 +1825,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); if (code != TSDB_CODE_SUCCESS) { - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayClear(aBlkSort); @@ -1736,7 +1837,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayClear(aBlkSort); @@ -1750,7 +1851,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (tsortIsClosed(pHandle)) { tSimpleHashClear(mUidBlk); - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayClear(aBlkSort); @@ -1759,7 +1860,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } tSimpleHashCleanup(mUidBlk); - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayDestroy(aBlkSort); @@ -2048,10 +2149,10 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) if (!lData) return pOrder->nullFirst ? -1 : 1; if (!rData) return pOrder->nullFirst ? 1 : -1; - int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; + int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; __compar_fn_t fn = getKeyComparFunc(type, pOrder->order); - int ret = fn(lData, rData); + int32_t ret = fn(lData, rData); if (ret == 0) { continue; } else { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 1401dba820..05cc67e069 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -720,60 +720,3 @@ void rspMonitorFn(void* param, void* tmrId) { taosArrayDestroy(pTimeoutList); } -int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1519382fe4..5a4e3a5439 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -34,92 +34,6 @@ static int32_t streamTaskBackupCheckpoint(const char* id, const char* path); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); -int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; - if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; - if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; - tEndEncode(pEncoder); - return 0; -} - -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - static int32_t streamAlignCheckpoint(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index f7245acc55..58c6e19581 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -43,67 +43,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->contLen = contLen; } -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; - ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); - ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); - for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); - void* data = taosArrayGetP(pReq->data, i); - if (tEncodeI32(pEncoder, len) < 0) return -1; - if (tEncodeBinary(pEncoder, data, len) < 0) return -1; - } - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; - - ASSERT(pReq->blockNum > 0); - pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); - pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); - for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t len1; - uint64_t len2; - void* data; - if (tDecodeI32(pDecoder, &len1) < 0) return -1; - if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; - ASSERT(len1 == len2); - taosArrayPush(pReq->dataLen, &len1); - taosArrayPush(pReq->data, &data); - } - - tEndDecode(pDecoder); - return 0; -} - static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; @@ -129,41 +68,6 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas return TSDB_CODE_SUCCESS; } -void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { - taosArrayDestroyP(pReq->data, taosMemoryFree); - taosArrayDestroy(pReq->dataLen); -} - -int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1; - if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1; - uint64_t len = 0; - if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1; - pReq->retrieveLen = (int32_t)len; - tEndDecode(pDecoder); - return 0; -} - -void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } - void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); @@ -1262,45 +1166,3 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; - - int32_t size = taosArrayGetSize(pMsg->pNodeList); - if (tEncodeI32(pEncoder, size) < 0) return -1; - - for (int32_t i = 0; i < size; ++i) { - SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); - if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1; - } - - // todo this new attribute will be result in being incompatible with previous version - if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; - - int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; - pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); - for (int32_t i = 0; i < size; ++i) { - SNodeUpdateInfo info = {0}; - if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1; - taosArrayPush(pMsg->pNodeList, &info); - } - - if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; - - tEndDecode(pDecoder); - return 0; -} diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c401141821..becc692a07 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -944,102 +944,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { taosArrayDestroy(pRecycleList); } -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; - - for (int32_t i = 0; i < pReq->numOfTasks; ++i) { - STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); - if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; - if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, ps->status) < 0) return -1; - if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; - if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; - if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1; - if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; - if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; - if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1; - } - - int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); - if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1; - - for (int j = 0; j < numOfVgs; ++j) { - int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); - if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; - } - - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; - - pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); - for (int32_t i = 0; i < pReq->numOfTasks; ++i) { - int32_t taskId = 0; - STaskStatusEntry entry = {0}; - - if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1; - - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; - - entry.id.taskId = taskId; - taosArrayPush(pReq->pTaskStatus, &entry); - } - - int32_t numOfVgs = 0; - if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1; - - pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t)); - - for (int j = 0; j < numOfVgs; ++j) { - int32_t vgId = 0; - if (tDecodeI32(pDecoder, &vgId) < 0) return -1; - taosArrayPush(pReq->pUpdateNodes, &vgId); - } - - tEndDecode(pDecoder); - return 0; -} - static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter pInfo->tickCounter = 0; @@ -1048,20 +952,6 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { return false; } -void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { - if (pMsg == NULL) { - return; - } - - if (pMsg->pUpdateNodes != NULL) { - taosArrayDestroy(pMsg->pUpdateNodes); - } - - if (pMsg->pTaskStatus != NULL) { - taosArrayDestroy(pMsg->pTaskStatus); - } -} - static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); for (int k = 0; k < numOfExisted; ++k) { @@ -1734,7 +1624,7 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta bool hasFillhistoryTask = false; STaskId hId = {0}; - stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); + stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); streamMetaRLock(pMeta); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7f5ea52f58..1e622f615d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -137,179 +137,6 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, return pTask; } -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) { - if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; - if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1; - return 0; -} - -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { - if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; - if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1; - return 0; -} - -int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; - if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; - - if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; - - if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1; - - if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; - - if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; - int32_t taskId = pTask->hTaskInfo.id.taskId; - if (tEncodeI32(pEncoder, taskId)) return -1; - - if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; - taskId = pTask->streamTaskId.taskId; - if (tEncodeI32(pEncoder, taskId)) return -1; - - if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; - if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; - if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; - if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; - - int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - if (tEncodeI32(pEncoder, epSz) < 0) return -1; - for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; - if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; - } - if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; - - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - int32_t taskId = 0; - - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; - - if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; - if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; - - if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; - - if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1; - - if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; - - if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; - if (tDecodeI32(pDecoder, &taskId)) return -1; - pTask->hTaskInfo.id.taskId = taskId; - - if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &taskId)) return -1; - pTask->streamTaskId.taskId = taskId; - - if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1; - if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1; - if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; - if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; - - int32_t epSz = -1; - if (tDecodeI32(pDecoder, &epSz) < 0) return -1; - - pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); - for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); - if (pInfo == NULL) return -1; - if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { - taosMemoryFreeClear(pInfo); - return -1; - } - taosArrayPush(pTask->upstreamInfo.pList, &pInfo); - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; - pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; - } - if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; - } - if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; - - tEndDecode(pDecoder); - return 0; -} - int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { int64_t skip64; int8_t skip8; diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c new file mode 100644 index 0000000000..5e52b927c6 --- /dev/null +++ b/source/libs/stream/src/streammsg.c @@ -0,0 +1,587 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "streammsg.h" +#include "tstream.h" + +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) { + if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1; + return 0; +} + +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { + if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1; + return 0; +} + +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + + +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; + + int32_t size = taosArrayGetSize(pMsg->pNodeList); + if (tEncodeI32(pEncoder, size) < 0) return -1; + + for (int32_t i = 0; i < size; ++i) { + SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); + if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1; + } + + // todo this new attribute will be result in being incompatible with previous version + if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + + +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; + + int32_t size = 0; + if (tDecodeI32(pDecoder, &size) < 0) return -1; + pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); + for (int32_t i = 0; i < size; ++i) { + SNodeUpdateInfo info = {0}; + if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1; + taosArrayPush(pMsg->pNodeList, &info); + } + + if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + + +int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; + ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); + ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + void* data = taosArrayGetP(pReq->data, i); + if (tEncodeI32(pEncoder, len) < 0) return -1; + if (tEncodeBinary(pEncoder, data, len) < 0) return -1; + } + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; + + ASSERT(pReq->blockNum > 0); + pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); + pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len1; + uint64_t len2; + void* data; + if (tDecodeI32(pDecoder, &len1) < 0) return -1; + if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; + ASSERT(len1 == len2); + taosArrayPush(pReq->dataLen, &len1); + taosArrayPush(pReq->data, &data); + } + + tEndDecode(pDecoder); + return 0; +} + +void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { + taosArrayDestroyP(pReq->data, taosMemoryFree); + taosArrayDestroy(pReq->dataLen); +} + +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1; + if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1; + uint64_t len = 0; + if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1; + pReq->retrieveLen = (int32_t)len; + tEndDecode(pDecoder); + return 0; +} + +void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } + +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; + + for (int32_t i = 0; i < pReq->numOfTasks; ++i) { + STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); + if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; + if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, ps->status) < 0) return -1; + if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; + if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; + if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1; + if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1; + } + + int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); + if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1; + + for (int j = 0; j < numOfVgs; ++j) { + int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); + if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; + } + + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; + + pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); + for (int32_t i = 0; i < pReq->numOfTasks; ++i) { + int32_t taskId = 0; + STaskStatusEntry entry = {0}; + + if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1; + + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; + + entry.id.taskId = taskId; + taosArrayPush(pReq->pTaskStatus, &entry); + } + + int32_t numOfVgs = 0; + if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1; + + pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t)); + + for (int j = 0; j < numOfVgs; ++j) { + int32_t vgId = 0; + if (tDecodeI32(pDecoder, &vgId) < 0) return -1; + taosArrayPush(pReq->pUpdateNodes, &vgId); + } + + tEndDecode(pDecoder); + return 0; +} + +void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { + if (pMsg == NULL) { + return; + } + + if (pMsg->pUpdateNodes != NULL) { + taosArrayDestroy(pMsg->pUpdateNodes); + } + + if (pMsg->pTaskStatus != NULL) { + taosArrayDestroy(pMsg->pTaskStatus); + } +} + +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; + if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; + + if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; + + if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1; + + if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; + + if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; + int32_t taskId = pTask->hTaskInfo.id.taskId; + if (tEncodeI32(pEncoder, taskId)) return -1; + + if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; + taskId = pTask->streamTaskId.taskId; + if (tEncodeI32(pEncoder, taskId)) return -1; + + if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; + + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); + if (tEncodeI32(pEncoder, epSz) < 0) return -1; + for (int32_t i = 0; i < epSz; i++) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; + } + if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; + + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; + + if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; + if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; + + if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; + + if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1; + + if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; + + if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; + if (tDecodeI32(pDecoder, &taskId)) return -1; + pTask->hTaskInfo.id.taskId = taskId; + + if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; + if (tDecodeI32(pDecoder, &taskId)) return -1; + pTask->streamTaskId.taskId = taskId; + + if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1; + if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; + + int32_t epSz = -1; + if (tDecodeI32(pDecoder, &epSz) < 0) return -1; + + pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); + for (int32_t i = 0; i < epSz; i++) { + SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); + if (pInfo == NULL) return -1; + if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { + taosMemoryFreeClear(pInfo); + return -1; + } + taosArrayPush(pTask->upstreamInfo.pList, &pInfo); + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; + } + if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + } + if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} \ No newline at end of file