This commit is contained in:
chenhaoran 2024-05-09 14:15:47 +08:00
commit 69e3089169
12 changed files with 905 additions and 772 deletions

View File

@ -16,6 +16,8 @@
#ifndef TDENGINE_STREAMMSG_H
#define TDENGINE_STREAMMSG_H
#include "tmsg.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -45,6 +47,9 @@ typedef struct {
int64_t expireTime;
} SStreamCheckpointSourceReq;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
typedef struct {
int64_t streamId;
int64_t checkpointId;
@ -55,9 +60,6 @@ typedef struct {
int8_t success;
} SStreamCheckpointSourceRsp;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
typedef struct SStreamTaskNodeUpdateMsg {

View File

@ -60,15 +60,15 @@ int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 10;
int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfTaskQueueThreads = 16;
int32_t tsNumOfMnodeQueryThreads = 16;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeQueryThreads = 16;
float tsRatioOfVnodeStreamThreads = 0.5F;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeQueryThreads = 16;
int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1;
@ -554,7 +554,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 10);
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16);
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0)
return -1;
@ -645,7 +645,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
return -1;
tsNumOfVnodeQueryThreads = tsNumOfCores * 2;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 16);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
0)
return -1;
@ -666,7 +666,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
return -1;
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 16);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
0)
return -1;
@ -918,7 +918,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "numOfVnodeQueryThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeQueryThreads = numOfCores * 2;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 16);
pItem->i32 = tsNumOfVnodeQueryThreads;
pItem->stype = stype;
}
@ -948,7 +948,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "numOfQnodeQueryThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeQueryThreads = numOfCores * 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 16);
pItem->i32 = tsNumOfQnodeQueryThreads;
pItem->stype = stype;
}

View File

@ -895,7 +895,6 @@ typedef enum {
} EExecMode;
typedef struct {
int64_t version;
SRowKey rowKey;
int8_t dirty;
SColVal colVal;
@ -909,20 +908,20 @@ typedef struct {
uint32_t nData;
};
};
} SValueV1;
} SValueV0;
typedef struct {
int16_t cid;
int8_t type;
int8_t flag;
SValueV1 value;
} SColValV1;
SValueV0 value;
} SColValV0;
typedef struct {
TSKEY ts;
int8_t dirty;
SColValV1 colVal;
} SLastColV1;
SColValV0 colVal;
} SLastColV0;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);

View File

@ -130,18 +130,19 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
enum {
LFLAG_LAST_ROW = 0,
LFLAG_LAST = 1,
LFLAG_VERSION = 1 << 2,
LFLAG_VERSION_BITS = (1 << 2 | 1 << 3),
LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK,
};
#define LAST_KEY_HAS_VERSION ((k).lflag & LFLAG_VERSION_BITS)
typedef struct {
tb_uid_t uid;
int16_t cid;
int8_t lflag;
} SLastKey;
#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63)
#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2)
#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY)
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
@ -180,9 +181,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t
return 1;
}
if (lhs->lflag < rhs->lflag) {
if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
return -1;
} else if (lhs->lflag > rhs->lflag) {
} else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
return 1;
}
@ -336,40 +337,39 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
}
// note: new object do not own colVal's resource, just copy the pointer
static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) {
static SLastCol *tsdbCacheConvertLastColV0(SLastColV0 *pLastColV0) {
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (pLastCol == NULL) return NULL;
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pLastColV1->ts;
pLastCol->rowKey.ts = pLastColV0->ts;
pLastCol->rowKey.numOfPKs = 0;
pLastCol->dirty = pLastColV1->dirty;
pLastCol->colVal.cid = pLastColV1->colVal.cid;
pLastCol->colVal.flag = pLastColV1->colVal.flag;
pLastCol->colVal.value.type = pLastColV1->colVal.type;
pLastCol->colVal.value.val = pLastColV1->colVal.value.val;
pLastCol->dirty = pLastColV0->dirty;
pLastCol->colVal.cid = pLastColV0->colVal.cid;
pLastCol->colVal.flag = pLastColV0->colVal.flag;
pLastCol->colVal.value.type = pLastColV0->colVal.type;
pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
return pLastCol;
}
static SLastCol *tsdbCacheDeserializeV1(char const *value) {
static SLastCol *tsdbCacheDeserializeV0(char const *value) {
if (!value) {
return NULL;
}
SLastColV1 *pLastColV1 = (SLastColV1 *)value;
SColValV1 *pColVal = &pLastColV1->colVal;
SLastColV0 *pLastColV0 = (SLastColV0 *)value;
SColValV0 *pColVal = &pLastColV0->colVal;
if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (pColVal->value.nData > 0) {
pColVal->value.pData = (char *)value + sizeof(*pLastColV1);
pColVal->value.pData = (char *)value + sizeof(*pLastColV0);
} else {
pColVal->value.pData = NULL;
}
}
return tsdbCacheConvertLastColV1(pLastColV1);
return tsdbCacheConvertLastColV0(pLastColV0);
}
static SLastCol *tsdbCacheDeserializeV2(char const *value) {
static SLastCol *tsdbCacheDeserializeV1(char const *value) {
if (!value) {
return NULL;
}
@ -403,16 +403,26 @@ static SLastCol *tsdbCacheDeserializeV2(char const *value) {
return pLastCol;
}
static SLastCol *tsdbCacheDeserialize(char const *value) {
static SLastCol *tsdbCacheDeserialize(char const *value, int8_t lflag) {
if (!value) {
return NULL;
}
bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE;
if (!hasVersion) {
return tsdbCacheDeserializeV1(value);
int8_t version = lflag & LFLAG_VERSION_BITS;
SLastCol *lastCol = NULL;
switch (version) {
case 0:
lastCol = tsdbCacheDeserializeV0(value);
break;
case LFLAG_VERSION:
lastCol = tsdbCacheDeserializeV1(value);
break;
defalut:
tsdbError("invalid last key version %" PRId8 " , lflag:%" PRId8, version, lflag);
break;
}
return tsdbCacheDeserializeV2(value);
return lastCol;
}
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
@ -451,7 +461,7 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
SValue *pToValue = &pToLastCol->rowKey.pks[i];
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
}
}
}
// copy var data value
@ -571,8 +581,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
SRowKey noneRowKey = {0};
noneRowKey.ts = TSKEY_MIN;
noneRowKey.numOfPKs = 0;
SLastCol noneCol = {
.version = LAST_COL_VERSION, .rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol *pLastCol = &noneCol;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
@ -594,7 +603,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
charge += pLastCol->colVal.value.nData;
}
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
SLastKey *pLastKey = &(SLastKey){.lflag = lflag | LFLAG_VERSION, .uid = uid, .cid = cid};
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
@ -647,8 +656,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
keys_list[0] = keys;
keys_list[1] = keys + sizeof(SLastKey);
@ -672,13 +681,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
{
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], ((SLastKey*)keys_list[0])->lflag);
if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[0], klen);
}
taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[1]);
pLastCol = tsdbCacheDeserialize(values_list[1], ((SLastKey*)keys_list[1])->lflag);
if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[1], klen);
}
@ -935,7 +944,6 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
int nData = 0;
// update rowkey
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pRowKey->ts;
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
@ -1023,7 +1031,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
int16_t cid = pColVal->cid;
SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
size_t klen = ROCKS_KEY_LEN;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) {
@ -1041,7 +1049,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
}
if (COL_VAL_IS_VALUE(pColVal)) {
key->lflag = lflag | LFLAG_LAST;
key->lflag = lflag | LFLAG_LAST | LFLAG_VERSION;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
@ -1079,9 +1087,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_free(errs[i]);
}
taosMemoryFree(errs);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list_sizes);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) {
@ -1089,18 +1094,21 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag);
SLastCol *PToFree = pLastCol;
if (IS_LAST_ROW_KEY(idxKey->key)) {
int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
int32_t cmp_res = 1;
if (pLastCol) {
cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
}
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value,
tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value,
&vlen);
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
@ -1139,10 +1147,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value,
&vlen);
tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
@ -1185,7 +1191,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksMayWrite(pTsdb, true, false, true);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
taosArrayDestroy(remainCols);
}
@ -1392,7 +1401,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
SIdxKey *idxKey = taosArrayGet(remainCols, 0);
if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
SLastKey *key = &(SLastKey){.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key});
}
@ -1461,8 +1470,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
// still null, then make up a none col value
SLastCol noneCol = {.version = LAST_COL_VERSION,
.rowKey.ts = TSKEY_MIN,
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) {
pLastCol = &noneCol;
@ -1559,14 +1567,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
rocksdb_free(errs[i]);
}
}
taosMemoryFree(key_list);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs);
SLRUCache *pCache = pTsdb->lruCache;
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag);
SLastCol *PToFree = pLastCol;
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
if (pLastCol) {
@ -1608,6 +1612,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
}
}
taosMemoryFree(errs);
taosMemoryFree(key_list);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
@ -1629,7 +1637,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
for (int i = 0; i < num_keys; ++i) {
int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
SLastKey key = {.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = cid};
// for select last_row, last case
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
@ -1653,8 +1661,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
taosLRUCacheRelease(pCache, h, false);
} else {
SLastCol noneCol = {.version = LAST_COL_VERSION,
.rowKey.ts = TSKEY_MIN,
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol);
@ -1728,8 +1735,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
int16_t cid = pTSchema->columns[i].colId;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
keys_list[i] = keys;
keys_list[num_keys + i] = keys + sizeof(SLastKey);
@ -1759,14 +1766,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey *)keys_list[i])->lflag);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen);
}
taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], ((SLastKey *)keys_list[i + num_keys])->lflag);
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
}
@ -3292,8 +3299,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
for (int32_t i = 0; i < nCols; ++i) {
int16_t slotId = slotIds[i];
SLastCol col = {.version = LAST_COL_VERSION,
.rowKey.ts = 0,
SLastCol col = {.rowKey.ts = 0,
.colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
taosArrayPush(pColArray, &col);
}
@ -3399,12 +3405,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal});
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
@ -3454,7 +3460,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
@ -3578,12 +3584,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal});
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);

View File

@ -4320,6 +4320,7 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
@ -4339,6 +4340,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
return code;
}
}
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);

View File

@ -38,11 +38,11 @@ typedef struct SSortMemFileRegion {
int32_t bufRegOffset;
int32_t bufLen;
char* buf;
char* buf;
} SSortMemFileRegion;
typedef struct SSortMemFile {
char* writeBuf;
char* writeBuf;
int32_t writeBufSize;
int64_t writeFileOffset;
@ -55,7 +55,7 @@ typedef struct SSortMemFile {
int32_t blockSize;
FILE* pTdFile;
char memFilePath[PATH_MAX];
char memFilePath[PATH_MAX];
} SSortMemFile;
struct SSortHandle {
@ -260,6 +260,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false;
pSortHandle->cmpParam.sortType = type;
if (type == SORT_BLOCK_TS_MERGE) {
SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
@ -522,10 +523,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
if (isNull) {
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
} else {
@ -557,7 +557,9 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else {
if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex);
if (pSource->pageIndex % 512 == 0) {
qDebug("begin source %p page %d", pSource, pSource->pageIndex);
}
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
@ -635,7 +637,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa
// TODO: improve this function performance
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) {
SBlockOrderInfo* pOrder = pCompareOrder;
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
@ -680,7 +682,7 @@ int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
right1 = colDataGetData(pRightColInfoData, rightRowIndex);
__compar_fn_t fn = pOrder->compFn;
int ret = fn(left1, right1);
int32_t ret = fn(left1, right1);
return ret;
}
@ -719,7 +721,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
int ret = pParam->cmpTsFn(leftTs, rightTs);
int32_t ret = pParam->cmpTsFn(leftTs, rightTs);
if (ret == 0 && pParam->pPkOrder) {
ret = tsortComparBlockCell(pLeftBlock, pRightBlock,
pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder);
@ -782,7 +784,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
pOrder->compFn = fn;
}
int ret = fn(left1, right1);
int32_t ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
@ -855,7 +857,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code;
}
int nMergedRows = 0;
int32_t nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) {
@ -1075,7 +1077,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
}
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -1095,7 +1097,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
taosMemoryFreeClear(*ppRow);
terrno = TAOS_SYSTEM_ERROR(errno);
@ -1214,7 +1216,7 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) {
pRegion->regionSize = pMemFile->currRegionOffset;
int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
if (writeBytes > 0) {
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -1241,13 +1243,15 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
return TSDB_CODE_SUCCESS;
}
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx,
int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
{
if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -1255,11 +1259,13 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
}
}
*pRegionId = pMemFile->currRegionId;
*pOffset = pMemFile->currRegionOffset;
int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
*pLength = blockLen;
pMemFile->currRegionOffset += blockLen;
pMemFile->bRegionDirty = true;
return TSDB_CODE_SUCCESS;
@ -1317,27 +1323,30 @@ static void initRowIdSort(SSortHandle* pHandle) {
blockDataAppendColInfo(pSortInput, &offsetCol);
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
blockDataAppendColInfo(pSortInput, &lengthCol);
if (pHandle->bSortPk) {
pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
blockDataAppendColInfo(pSortInput, &pkCol);
}
blockDataDestroy(pHandle->pDataBlock);
pHandle->pDataBlock = pSortInput;
int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
pHandle->pageSize = 256 * 1024; // 256k
pHandle->numOfPages = 256;
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order;
SBlockOrderInfo* pTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo biTs = {0};
biTs.order = pTsOrder->order;
biTs.order = tsOrder;
biTs.slotId = 0;
biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
taosArrayPush(aOrder, &biTs);
taosArrayPush(pOrderInfoList, &biTs);
if (pHandle->bSortPk) {
SBlockOrderInfo biPk = {0};
@ -1345,11 +1354,11 @@ static void initRowIdSort(SSortHandle* pHandle) {
biPk.slotId = 4;
biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order);
taosArrayPush(aOrder, &biPk);
taosArrayPush(pOrderInfoList, &biPk);
}
taosArrayDestroy(pHandle->pSortInfo);
pHandle->pSortInfo = aOrder;
return;
pHandle->pSortInfo = pOrderInfoList;
}
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
@ -1440,128 +1449,224 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
return 0;
}
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
int sz = 0;
int numCols = taosArrayGetSize(blk->pDataBlock);
if (!blk->info.hasVarCol) {
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
sz += blockDataGetRowSize(blk);
static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) {
int32_t size = 0;
int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock);
if (!pSrcBlock->info.hasVarCol) {
size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
size += blockDataGetRowSize(pSrcBlock);
} else {
for (int32_t i = 0; i < numCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) {
char* p = colDataGetData(pColInfoData, row);
sz += varDataTLen(p);
if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) {
char* p = colDataGetData(pColInfoData, srcRowIndex);
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
size += getJsonValueLen(p);
} else {
size += varDataTLen(p);
}
}
sz += sizeof(pColInfoData->varmeta.offset[0]);
size += sizeof(pColInfoData->varmeta.offset[0]);
} else {
sz += pColInfoData->info.bytes;
size += pColInfoData->info.bytes;
if (((rowIdxInPage) & 0x07) == 0) {
sz += 1; // bitmap
if (((dstRowIndex) & 0x07) == 0) {
size += 1; // bitmap
}
}
}
}
return sz;
return size;
}
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex,
SColumnInfoData* pPkCol) {
int32_t size = 0;
int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
if (pPkCol == NULL) { // no var column
ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol));
size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
size += blockDataGetRowSize(pDstBlock);
} else {
ASSERT(numOfCols == 5);
size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0);
for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i);
size += pColInfo->info.bytes;
}
// handle the pk column, the last column, may be the var char column
if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
char* p = colDataGetData(pPkCol, srcRowIndex);
size += varDataTLen(p);
}
size += sizeof(pPkCol->varmeta.offset[0]);
} else {
size += pPkCol->info.bytes;
if (((dstRowIndex) & 0x07) == 0) {
size += 1; // bitmap
}
}
}
return size;
}
static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock,
int32_t srcRowIndex) {
int32_t inc = 0;
if (pHandle->bSortByRowId) {
SColumnInfoData* pPkCol = NULL;
// there may be varchar column exists, so we need to get the pk info, and then calculate the row length
if (pHandle->bSortPk) {
SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId);
}
inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol);
} else {
inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex);
}
return inc;
}
static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) {
memset(pSup, 0, sizeof(SBlkMergeSupport));
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t));
pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*));
pSup->tsOrder = tsOrder;
pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*));
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId);
pSup->aTs[i] = (int64_t*)col->pData;
pSup->aRowIdx[i] = 0;
pSup->aBlks[i] = pBlock;
}
pSup->pPkOrder = pPkOrderInfo;
return TSDB_CODE_SUCCESS;
}
static void cleanupMergeSup(SBlkMergeSupport* pSup) {
taosMemoryFree(pSup->aRowIdx);
taosMemoryFree(pSup->aTs);
taosMemoryFree(pSup->aBlks);
}
static int32_t getTotalRows(SArray* pBlockList) {
int32_t totalRows = 0;
for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) {
SSDataBlock* blk = taosArrayGetP(pBlockList, i);
totalRows += blk->info.rows;
}
return totalRows;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS;
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ?
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlkMergeSupport sup = {0};
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.tsOrder = pOrigBlockTsOrder->order;
sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*));
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
sup.aBlks[i] = blk;
}
SBlockOrderInfo* pOrigBlockTsOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo* pOrigBlockPkOrder = NULL;
if (pHandle->bSortPk) {
pOrigBlockPkOrder = (!pHandle->bSortByRowId) ?
taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
pOrigBlockPkOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
}
sup.pPkOrder = pOrigBlockPkOrder;
int32_t totalRows = 0;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows;
}
initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder);
int32_t totalRows = getTotalRows(aBlk);
SMultiwayMergeTreeInfo* pTree = NULL;
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
int32_t nRows = 0;
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
bool mergeLimitReached = false;
size_t blkPgSz = pageHeaderSize;
int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree);
taosArrayDestroy(aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
return code;
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree);
taosArrayDestroy(aPgId);
cleanupMergeSup(&sup);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pageHeaderSize;
bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
break;
}
}
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
if (!pHandle->bSortByRowId) {
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
if (pHandle->bSortByRowId) {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
} else {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
}
blkPgSz += bufInc;
ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize);
++nRows;
@ -1572,6 +1677,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
@ -1580,18 +1686,16 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aPgId);
taosMemoryFree(pTree);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
}
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
}
}
blockDataCleanup(pHandle->pDataBlock);
@ -1600,10 +1704,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
return 0;
@ -1724,7 +1825,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
@ -1736,7 +1837,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
@ -1750,7 +1851,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
@ -1759,7 +1860,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
tSimpleHashCleanup(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayDestroy(aBlkSort);
@ -2048,10 +2149,10 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
__compar_fn_t fn = getKeyComparFunc(type, pOrder->order);
int ret = fn(lData, rData);
int32_t ret = fn(lData, rData);
if (ret == 0) {
continue;
} else {

View File

@ -720,60 +720,3 @@ void rspMonitorFn(void* param, void* tmrId) {
taosArrayDestroy(pTimeoutList);
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -34,92 +34,6 @@ static int32_t streamTaskBackupCheckpoint(const char* id, const char* path);
static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);

View File

@ -43,67 +43,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->contLen = contLen;
}
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len1;
uint64_t len2;
void* data;
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
ASSERT(len1 == len2);
taosArrayPush(pReq->dataLen, &len1);
taosArrayPush(pReq->data, &data);
}
tEndDecode(pDecoder);
return 0;
}
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId;
@ -129,41 +68,6 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
return TSDB_CODE_SUCCESS;
}
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroy(pReq->dataLen);
}
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
uint64_t len = 0;
if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
pReq->retrieveLen = (int32_t)len;
tEndDecode(pDecoder);
return 0;
}
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
@ -1262,45 +1166,3 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
return 0;
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
int32_t size = taosArrayGetSize(pMsg->pNodeList);
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
}
// todo this new attribute will be result in being incompatible with previous version
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo info = {0};
if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1;
taosArrayPush(pMsg->pNodeList, &info);
}
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -944,102 +944,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
taosArrayDestroy(pRecycleList);
}
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
}
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
for (int j = 0; j < numOfVgs; ++j) {
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
int32_t taskId = 0;
STaskStatusEntry entry = {0};
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
}
int32_t numOfVgs = 0;
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
for (int j = 0; j < numOfVgs; ++j) {
int32_t vgId = 0;
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
taosArrayPush(pReq->pUpdateNodes, &vgId);
}
tEndDecode(pDecoder);
return 0;
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
@ -1048,20 +952,6 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
return false;
}
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
if (pMsg == NULL) {
return;
}
if (pMsg->pUpdateNodes != NULL) {
taosArrayDestroy(pMsg->pUpdateNodes);
}
if (pMsg->pTaskStatus != NULL) {
taosArrayDestroy(pMsg->pTaskStatus);
}
}
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
@ -1734,7 +1624,7 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);

View File

@ -137,179 +137,6 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
return pTask;
}
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
return 0;
}
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
return 0;
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1;
int32_t taskId = pTask->hTaskInfo.id.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t taskId = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->hTaskInfo.id.taskId = taskId;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->streamTaskId.taskId = taskId;
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
int32_t epSz = -1;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES);
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t skip64;
int8_t skip8;

View File

@ -0,0 +1,587 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "streammsg.h"
#include "tstream.h"
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
return 0;
}
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
return 0;
}
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
int32_t size = taosArrayGetSize(pMsg->pNodeList);
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
}
// todo this new attribute will be result in being incompatible with previous version
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo info = {0};
if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1;
taosArrayPush(pMsg->pNodeList, &info);
}
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len1;
uint64_t len2;
void* data;
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
ASSERT(len1 == len2);
taosArrayPush(pReq->dataLen, &len1);
taosArrayPush(pReq->data, &data);
}
tEndDecode(pDecoder);
return 0;
}
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroy(pReq->dataLen);
}
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
uint64_t len = 0;
if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
pReq->retrieveLen = (int32_t)len;
tEndDecode(pDecoder);
return 0;
}
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
}
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
for (int j = 0; j < numOfVgs; ++j) {
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
int32_t taskId = 0;
STaskStatusEntry entry = {0};
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
}
int32_t numOfVgs = 0;
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
for (int j = 0; j < numOfVgs; ++j) {
int32_t vgId = 0;
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
taosArrayPush(pReq->pUpdateNodes, &vgId);
}
tEndDecode(pDecoder);
return 0;
}
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
if (pMsg == NULL) {
return;
}
if (pMsg->pUpdateNodes != NULL) {
taosArrayDestroy(pMsg->pUpdateNodes);
}
if (pMsg->pTaskStatus != NULL) {
taosArrayDestroy(pMsg->pTaskStatus);
}
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1;
int32_t taskId = pTask->hTaskInfo.id.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t taskId = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->hTaskInfo.id.taskId = taskId;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->streamTaskId.taskId = taskId;
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
int32_t epSz = -1;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES);
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}