Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TS-4706-3.0

This commit is contained in:
Hongze Cheng 2024-04-26 14:57:58 +08:00
commit efdc943a3d
20 changed files with 1264 additions and 1136 deletions

View File

@ -424,7 +424,7 @@ typedef struct STaskOutputInfo {
};
int8_t type;
STokenBucket* pTokenBucket;
SArray* pDownstreamUpdateList;
SArray* pNodeEpsetUpdateList;
} STaskOutputInfo;
typedef struct SUpstreamInfo {
@ -445,6 +445,8 @@ typedef struct STaskCheckInfo {
int32_t notReadyTasks;
int32_t inCheckProcess;
int32_t stopCheckProcess;
int32_t notReadyRetryCount;
int32_t timeoutRetryCount;
tmr_h checkRspTmr;
TdThreadMutex checkInfoLock;
} STaskCheckInfo;
@ -484,7 +486,7 @@ struct SStreamTask {
SSHashObj* pNameMap;
void* pBackend;
int8_t subtableWithoutMd5;
char reserve[255];
char reserve[256];
};
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
@ -848,7 +850,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id);
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);

View File

@ -57,9 +57,9 @@ else
arch=$cpuType
fi
echo "${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r ${arch} -e taoskeeper"
echo "${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r ${arch} -e taoskeeper -t ver-${tdengine_ver}"
echo "$top_dir=${top_dir}"
taoskeeper_binary=`${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r $arch -e taoskeeper`
taoskeeper_binary=`${top_dir}/../enterprise/packaging/build_taoskeeper.sh -r $arch -e taoskeeper -t ver-${tdengine_ver}`
echo "taoskeeper_binary: ${taoskeeper_binary}"
# copy config files
@ -76,6 +76,13 @@ if [ -f "${compile_dir}/test/cfg/taosadapter.service" ]; then
cp ${compile_dir}/test/cfg/taosadapter.service ${pkg_dir}${install_home_path}/cfg || :
fi
if [ -f "%{_compiledir}/../../../explorer/target/taos-explorer.service" ]; then
cp %{_compiledir}/../../../explorer/target/taos-explorer.service ${pkg_dir}${install_home_path}/cfg || :
fi
if [ -f "%{_compiledir}/../../../explorer/server/example/explorer.toml" ]; then
cp %{_compiledir}/../../../explorer/server/example/explorer.toml ${pkg_dir}${install_home_path}/cfg || :
fi
cp ${taoskeeper_binary} ${pkg_dir}${install_home_path}/bin
#cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_path}/init.d
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
@ -93,6 +100,10 @@ if [ -f "${compile_dir}/build/bin/taosadapter" ]; then
cp ${compile_dir}/build/bin/taosadapter ${pkg_dir}${install_home_path}/bin ||:
fi
if [ -f "${compile_dir}/../../../explorer/target/release/taos-explorer" ]; then
cp ${compile_dir}/../../../explorer/target/release/taos-explorer ${pkg_dir}${install_home_path}/bin ||:
fi
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
[ -f ${compile_dir}/build/lib/${wslibfile} ] && cp ${compile_dir}/build/lib/${wslibfile} ${pkg_dir}${install_home_path}/driver ||:

View File

@ -72,6 +72,14 @@ if [ -f %{_compiledir}/../build-taoskeeper/taoskeeper.service ]; then
cp %{_compiledir}/../build-taoskeeper/taoskeeper.service %{buildroot}%{homepath}/cfg ||:
fi
if [ -f %{_compiledir}/../../../explorer/target/taos-explorer.service ]; then
cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||:
fi
if [ -f %{_compiledir}/../../../explorer/server/example/explorer.toml ]; then
cp %{_compiledir}/../../../explorer/server/example/explorer.toml %{buildroot}%{homepath}/cfg ||:
fi
#cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d
cp %{_compiledir}/../packaging/tools/post.sh %{buildroot}%{homepath}/script
cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/script
@ -84,6 +92,10 @@ cp %{_compiledir}/build/bin/udfd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosBenchmark %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
if [ -f %{_compiledir}/../../../explorer/target/release/taos-explorer ]; then
cp %{_compiledir}/../../../explorer/target/release/taos-explorer %{buildroot}%{homepath}/bin
fi
if [ -f %{_compiledir}/../build-taoskeeper/taoskeeper ]; then
cp %{_compiledir}/../build-taoskeeper/taoskeeper %{buildroot}%{homepath}/bin
fi

File diff suppressed because it is too large Load Diff

View File

@ -231,12 +231,8 @@ fi
if [ "$verMode" == "cluster" ]; then
sed 's/verMode=edge/verMode=cluster/g' ${install_dir}/bin/remove.sh >>remove_temp.sh
sed -i "s/serverName2=\"taosd\"/serverName2=\"${serverName2}\"/g" remove_temp.sh
sed -i "s/clientName2=\"taos\"/clientName2=\"${clientName2}\"/g" remove_temp.sh
sed -i "s/configFile2=\"taos.cfg\"/configFile2=\"${clientName2}.cfg\"/g" remove_temp.sh
sed -i "s/productName2=\"TDengine\"/productName2=\"${productName2}\"/g" remove_temp.sh
cusDomain=`echo "${cusEmail2}" | sed 's/^[^@]*@//'`
sed -i "s/emailName2=\"taosdata.com\"/emailName2=\"${cusDomain}\"/g" remove_temp.sh
sed -i "s/PREFIX=\"taos\"/PREFIX=\"${serverName2}\"/g" remove_temp.sh
sed -i "s/productName=\"TDengine\"/productName=\"${productName2}\"/g" remove_temp.sh
mv remove_temp.sh ${install_dir}/bin/remove.sh
fi
if [ "$verMode" == "cloud" ]; then
@ -262,12 +258,10 @@ cp ${install_files} ${install_dir}
cp ${install_dir}/install.sh install_temp.sh
if [ "$verMode" == "cluster" ]; then
sed -i 's/verMode=edge/verMode=cluster/g' install_temp.sh
sed -i "s/serverName2=\"taosd\"/serverName2=\"${serverName2}\"/g" install_temp.sh
sed -i "s/clientName2=\"taos\"/clientName2=\"${clientName2}\"/g" install_temp.sh
sed -i "s/configFile2=\"taos.cfg\"/configFile2=\"${clientName2}.cfg\"/g" install_temp.sh
sed -i "s/productName2=\"TDengine\"/productName2=\"${productName2}\"/g" install_temp.sh
sed -i "s/PREFIX=\"taos\"/PREFIX=\"${serverName2}\"/g" install_temp.sh
sed -i "s/productName=\"TDengine\"/productName=\"${productName2}\"/g" install_temp.sh
cusDomain=`echo "${cusEmail2}" | sed 's/^[^@]*@//'`
sed -i "s/emailName2=\"taosdata.com\"/emailName2=\"${cusDomain}\"/g" install_temp.sh
sed -i "s/emailName=\"taosdata.com\"/emailName=\"${cusDomain}\"/g" install_temp.sh
mv install_temp.sh ${install_dir}/install.sh
fi
if [ "$verMode" == "cloud" ]; then
@ -367,8 +361,7 @@ if [ "$verMode" == "cluster" ]; then
# copy taosx
if [ -d ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ]; then
cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir}
cp ${top_dir}/../enterprise/packaging/install_taosx.sh ${install_dir}/taosx
cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir}
cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx
sed -i 's/target=\"\"/target=\"taosx\"/g' ${install_dir}/taosx/uninstall.sh
fi

View File

@ -115,6 +115,30 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
return string;
}
static int32_t setCompressOption(cJSON* json, uint32_t para) {
uint8_t encode = COMPRESS_L1_TYPE_U32(para);
if (encode != 0) {
const char* encodeStr = columnEncodeStr(encode);
cJSON* encodeJson = cJSON_CreateString(encodeStr);
cJSON_AddItemToObject(json, "encode", encodeJson);
return 0;
}
uint8_t compress = COMPRESS_L2_TYPE_U32(para);
if (compress != 0) {
const char* compressStr = columnCompressStr(compress);
cJSON* compressJson = cJSON_CreateString(compressStr);
cJSON_AddItemToObject(json, "compress", compressJson);
return 0;
}
uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
if (level != 0) {
const char* levelStr = columnLevelStr(level);
cJSON* levelJson = cJSON_CreateString(levelStr);
cJSON_AddItemToObject(json, "level", levelJson);
return 0;
}
return 0;
}
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
SMAlterStbReq req = {0};
cJSON* json = NULL;
@ -199,6 +223,13 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
cJSON_AddItemToObject(json, "colNewName", colNewName);
break;
}
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
cJSON* colName = cJSON_CreateString(field->name);
cJSON_AddItemToObject(json, "colName", colName);
setCompressOption(json, field->bytes);
break;
}
default:
break;
}
@ -568,6 +599,12 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
break;
}
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
setCompressOption(json, vAlterTbReq.compress);
break;
}
default:
break;
}

View File

@ -827,7 +827,11 @@ TEST(clientCase, projection_query_tables) {
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use cache_1");
TAOS_RES* pRes = taos_query(pConn, "alter local 'fqdn 127.0.0.1'");
if (taos_errno(pRes) != 0) {
printf("failed to exec query, %s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");

View File

@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
int32_t tsTransPullupInterval = 2;
int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 60;
int32_t tsStreamCheckpointInterval = 300;
float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400;

View File

@ -69,7 +69,7 @@
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq);
static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp);
static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pRsp);
static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pRsp);
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
@ -895,8 +895,8 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
if (tEncodeI64(&encoder, pReq->normSourceTbUid) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1;
for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) {
SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i);
for (int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) {
SVgroupVer *p = taosArrayGet(pReq->pVgroupVerList, i);
if (tEncodeI32(&encoder, p->vgId) < 0) return -1;
if (tEncodeI64(&encoder, p->ver) < 0) return -1;
}
@ -8000,7 +8000,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->smaId)< 0) return -1;
if (tDecodeI64(&decoder, &pReq->smaId) < 0) return -1;
}
tEndDecode(&decoder);
@ -8445,8 +8445,8 @@ static int32_t tDecodeSVDropTbRsp(SDecoder *pCoder, SVDropTbRsp *pReq) {
}
int32_t tEncodeSVDropTbBatchReq(SEncoder *pCoder, const SVDropTbBatchReq *pReq) {
int32_t nReqs = taosArrayGetSize(pReq->pArray);
SVDropTbReq *pDropTbReq;
int32_t nReqs = taosArrayGetSize(pReq->pArray);
SVDropTbReq *pDropTbReq;
if (tStartEncode(pCoder) < 0) return -1;
@ -8709,6 +8709,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
if (tEncodeU32(pEncoder, pReq->compress) < 0) return -1;
break;
default:
@ -8763,6 +8764,7 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq)
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
if (tDecodeU32(pDecoder, &pReq->compress) < 0) return -1;
break;
default:
@ -9200,7 +9202,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRspCommon *pRsp)
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const void *pRsp) {
if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1;
if (tEncodeI64(pEncoder, ((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1;
if (tEncodeI64(pEncoder, ((SMqDataRsp *)pRsp)->sleepTime) < 0) return -1;
return 0;
}
@ -9253,7 +9255,7 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRspCommon *pRsp) {
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) {
if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1;
if (tDecodeI64(pDecoder, &((SMqDataRsp *)pRsp)->sleepTime) < 0) return -1;
}
return 0;
@ -9272,9 +9274,7 @@ static void tDeleteMqDataRspCommon(void *rsp) {
tOffsetDestroy(&pRsp->rspOffset);
}
void tDeleteMqDataRsp(void *rsp) {
tDeleteMqDataRspCommon(rsp);
}
void tDeleteMqDataRsp(void *rsp) { tDeleteMqDataRspCommon(rsp); }
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const void *rsp) {
if (tEncodeMqDataRspCommon(pEncoder, rsp) < 0) return -1;
@ -9300,7 +9300,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, void *rsp) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *));
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void * pCreate = NULL;
void *pCreate = NULL;
uint64_t len = 0;
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
int32_t l = (int32_t)len;
@ -10114,7 +10114,7 @@ void setFieldWithOptions(SFieldWithOptions *fieldWithOptions, SField *field) {
fieldWithOptions->type = field->type;
strncpy(fieldWithOptions->name, field->name, TSDB_COL_NAME_LEN);
}
int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAInfoReq* pReq) {
int32_t tSerializeTableTSMAInfoReq(void *buf, int32_t bufLen, const STableTSMAInfoReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -10129,13 +10129,13 @@ int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAIn
return tlen;
}
int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoReq* pReq) {
int32_t tDeserializeTableTSMAInfoReq(void *buf, int32_t bufLen, STableTSMAInfoReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, (uint8_t*)&pReq->fetchingWithTsmaName) < 0) return -1;
if (tDecodeI8(&decoder, (uint8_t *)&pReq->fetchingWithTsmaName) < 0) return -1;
tEndDecode(&decoder);
@ -10143,7 +10143,7 @@ int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoRe
return 0;
}
static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pTsmaInfo) {
static int32_t tEncodeTableTSMAInfo(SEncoder *pEncoder, const STableTSMAInfo *pTsmaInfo) {
if (tEncodeCStr(pEncoder, pTsmaInfo->name) < 0) return -1;
if (tEncodeU64(pEncoder, pTsmaInfo->tsmaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pTsmaInfo->tb) < 0) return -1;
@ -10160,7 +10160,7 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
int32_t size = pTsmaInfo->pFuncs ? pTsmaInfo->pFuncs->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
STableTSMAFuncInfo* pFuncInfo = taosArrayGet(pTsmaInfo->pFuncs, i);
STableTSMAFuncInfo *pFuncInfo = taosArrayGet(pTsmaInfo->pFuncs, i);
if (tEncodeI32(pEncoder, pFuncInfo->funcId) < 0) return -1;
if (tEncodeI16(pEncoder, pFuncInfo->colId) < 0) return -1;
}
@ -10168,13 +10168,13 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
size = pTsmaInfo->pTags ? pTsmaInfo->pTags->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
const SSchema* pSchema = taosArrayGet(pTsmaInfo->pTags, i);
const SSchema *pSchema = taosArrayGet(pTsmaInfo->pTags, i);
if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1;
}
size = pTsmaInfo->pUsedCols ? pTsmaInfo->pUsedCols->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
const SSchema* pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i);
const SSchema *pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i);
if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1;
}
@ -10187,7 +10187,7 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
return 0;
}
static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInfo) {
static int32_t tDecodeTableTSMAInfo(SDecoder *pDecoder, STableTSMAInfo *pTsmaInfo) {
if (tDecodeCStrTo(pDecoder, pTsmaInfo->name) < 0) return -1;
if (tDecodeU64(pDecoder, &pTsmaInfo->tsmaId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTsmaInfo->tb) < 0) return -1;
@ -10219,7 +10219,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
if (!pTsmaInfo->pTags) return -1;
for (int32_t i = 0; i < size; ++i) {
SSchema schema = {0};
if(tDecodeSSchema(pDecoder, &schema) < 0) return -1;
if (tDecodeSSchema(pDecoder, &schema) < 0) return -1;
taosArrayPush(pTsmaInfo->pTags, &schema);
}
}
@ -10239,7 +10239,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
if (tDecodeI64(pDecoder, &pTsmaInfo->reqTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pTsmaInfo->rspTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pTsmaInfo->delayDuration) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&pTsmaInfo->fillHistoryFinished) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t *)&pTsmaInfo->fillHistoryFinished) < 0) return -1;
return 0;
}
@ -10247,13 +10247,13 @@ static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoR
int32_t size = pRsp->pTsmas ? pRsp->pTsmas->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pRsp->pTsmas, i);
STableTSMAInfo *pInfo = taosArrayGetP(pRsp->pTsmas, i);
if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1;
}
return 0;
}
static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pRsp) {
static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pRsp) {
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
if (size <= 0) return 0;
@ -10268,7 +10268,7 @@ static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pR
return 0;
}
int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp) {
int32_t tSerializeTableTSMAInfoRsp(void *buf, int32_t bufLen, const STableTSMAInfoRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -10282,7 +10282,7 @@ int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAIn
return tlen;
}
int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pRsp) {
int32_t tDeserializeTableTSMAInfoRsp(void *buf, int32_t bufLen, STableTSMAInfoRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -10295,7 +10295,7 @@ int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRs
return 0;
}
void tFreeTableTSMAInfo(void* p) {
void tFreeTableTSMAInfo(void *p) {
STableTSMAInfo *pTsmaInfo = p;
if (pTsmaInfo) {
taosArrayDestroy(pTsmaInfo->pFuncs);
@ -10305,20 +10305,20 @@ void tFreeTableTSMAInfo(void* p) {
}
}
void tFreeAndClearTableTSMAInfo(void* p) {
STableTSMAInfo* pTsmaInfo = (STableTSMAInfo*)p;
void tFreeAndClearTableTSMAInfo(void *p) {
STableTSMAInfo *pTsmaInfo = (STableTSMAInfo *)p;
if (pTsmaInfo) {
tFreeTableTSMAInfo(pTsmaInfo);
taosMemoryFree(pTsmaInfo);
}
}
int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) {
int32_t tCloneTbTSMAInfo(STableTSMAInfo *pInfo, STableTSMAInfo **pRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pInfo) {
return TSDB_CODE_SUCCESS;
}
STableTSMAInfo* pRet = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
STableTSMAInfo *pRet = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pRet) return TSDB_CODE_OUT_OF_MEMORY;
*pRet = *pInfo;
@ -10357,7 +10357,7 @@ static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgres
return 0;
}
int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProgressReq* pReq) {
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -10371,7 +10371,7 @@ int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProg
return tlen;
}
static int32_t tDecodeStreamProgressReq(SDecoder* pDecoder, SStreamProgressReq* pReq) {
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->fetchIdx) < 0) return -1;
@ -10379,7 +10379,7 @@ static int32_t tDecodeStreamProgressReq(SDecoder* pDecoder, SStreamProgressReq*
return 0;
}
int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgressReq* pReq) {
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
@ -10392,7 +10392,7 @@ int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgress
return 0;
}
static int32_t tEncodeStreamProgressRsp(SEncoder* pEncoder, const SStreamProgressRsp* pRsp) {
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->fillHisFinished) < 0) return -1;
@ -10402,7 +10402,7 @@ static int32_t tEncodeStreamProgressRsp(SEncoder* pEncoder, const SStreamProgres
return 0;
}
int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp) {
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -10416,17 +10416,17 @@ int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProg
return tlen;
}
static int32_t tDecodeStreamProgressRsp(SDecoder* pDecoder, SStreamProgressRsp* pRsp) {
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&pRsp->fillHisFinished) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->progressDelay) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->fetchIdx) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->subFetchIdx) < 0) return -1;
return 0;
}
int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp) {
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -10440,22 +10440,22 @@ int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgres
}
int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder *pEncoder, const SMDropTbReqsOnSingleVg *pReq) {
const SVgroupInfo* pVgInfo = &pReq->vgInfo;
const SVgroupInfo *pVgInfo = &pReq->vgInfo;
if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1;
if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1;
int32_t size = pReq->pTbs ? pReq->pTbs->size: 0;
int32_t size = pReq->pTbs ? pReq->pTbs->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
const SVDropTbReq* pInfo = taosArrayGet(pReq->pTbs, i);
const SVDropTbReq *pInfo = taosArrayGet(pReq->pTbs, i);
if (tEncodeSVDropTbReq(pEncoder, pInfo) < 0) return -1;
}
return 0;
}
int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq) {
int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder *pDecoder, SMDropTbReqsOnSingleVg *pReq) {
if (tDecodeI32(pDecoder, &pReq->vgInfo.vgId) < 0) return -1;
if (tDecodeU32(pDecoder, &pReq->vgInfo.hashBegin) < 0) return -1;
if (tDecodeU32(pDecoder, &pReq->vgInfo.hashEnd) < 0) return -1;
@ -10477,18 +10477,18 @@ int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg*
}
void tFreeSMDropTbReqOnSingleVg(void *p) {
SMDropTbReqsOnSingleVg* pReq = p;
SMDropTbReqsOnSingleVg *pReq = p;
taosArrayDestroy(pReq->pTbs);
}
int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq){
int32_t tSerializeSMDropTbsReq(void *buf, int32_t bufLen, const SMDropTbsReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
tStartEncode(&encoder);
int32_t size = pReq->pVgReqs ? pReq->pVgReqs->size : 0;
if (tEncodeI32(&encoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
SMDropTbReqsOnSingleVg* pVgReq = taosArrayGet(pReq->pVgReqs, i);
SMDropTbReqsOnSingleVg *pVgReq = taosArrayGet(pReq->pVgReqs, i);
if (tEncodeSMDropTbReqOnSingleVg(&encoder, pVgReq) < 0) return -1;
}
tEndEncode(&encoder);
@ -10497,7 +10497,7 @@ int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pR
return tlen;
}
int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq) {
int32_t tDeserializeSMDropTbsReq(void *buf, int32_t bufLen, SMDropTbsReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
tStartDecode(&decoder);
@ -10518,12 +10518,12 @@ int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq)
return 0;
}
void tFreeSMDropTbsReq(void* p) {
SMDropTbsReq* pReq = p;
void tFreeSMDropTbsReq(void *p) {
SMDropTbsReq *pReq = p;
taosArrayDestroyEx(pReq->pVgReqs, tFreeSMDropTbReqOnSingleVg);
}
int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp) {
int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder *pCoder, const SVFetchTtlExpiredTbsRsp *pRsp) {
if (tEncodeI32(pCoder, pRsp->vgId) < 0) return -1;
int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0;
if (tEncodeI32(pCoder, size) < 0) return -1;
@ -10533,7 +10533,7 @@ int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredT
return 0;
}
int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp) {
int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder *pCoder, SVFetchTtlExpiredTbsRsp *pRsp) {
if (tDecodeI32(pCoder, &pRsp->vgId) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pCoder, &size) < 0) return -1;
@ -10549,7 +10549,7 @@ int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp*
return 0;
}
void tFreeFetchTtlExpiredTbsRsp(void* p) {
SVFetchTtlExpiredTbsRsp* pRsp = p;
void tFreeFetchTtlExpiredTbsRsp(void *p) {
SVFetchTtlExpiredTbsRsp *pRsp = p;
taosArrayDestroy(pRsp->pExpiredTbs);
}

View File

@ -1457,8 +1457,8 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->igUpdate = 0;
pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
// construct tags
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
@ -1494,7 +1494,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
pCxt->pDropStreamReq->igNotExists = false;
pCxt->pDropStreamReq->sql = strdup(pCxt->pDropSmaReq->name);
pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
}

View File

@ -195,13 +195,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
const char* idstr = pTask->id.idStr;
if (pMeta->updateInfo.transId != req.transId) {
ASSERT(req.transId > pMeta->updateInfo.transId);
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
if (req.transId < pMeta->updateInfo.transId) {
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
pMeta->updateInfo.transId, req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = req.transId;
taosArrayDestroy(req.pNodeList);
return rsp.code;
} else {
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = req.transId;
}
} else {
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
}

View File

@ -64,13 +64,15 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
col_id_t colId = -1;
SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i);
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
}
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
if (slotIds[i] == -1) {
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {

View File

@ -1054,14 +1054,30 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
}
}
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) {
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo, SBlockLoadSuppInfo* pSupp) {
record->uid = pBlockInfo->uid;
record->firstKey = (STsdbRowKey){
.key = {.ts = pBlockInfo->firstKey, .numOfPKs = 0},
};
record->lastKey = (STsdbRowKey){
.key = {.ts = pBlockInfo->lastKey, .numOfPKs = 0},
};
record->firstKey = (STsdbRowKey){.key = {.ts = pBlockInfo->firstKey, .numOfPKs = pSupp->numOfPks}};
record->lastKey = (STsdbRowKey){.key = {.ts = pBlockInfo->lastKey, .numOfPKs = pSupp->numOfPks}};
if (pSupp->numOfPks > 0) {
SValue* pFirst = &record->firstKey.key.pks[0];
SValue* pLast = &record->lastKey.key.pks[0];
pFirst->type = pSupp->pk.type;
pLast->type = pSupp->pk.type;
if (IS_VAR_DATA_TYPE(pFirst->type)) {
pFirst->pData = (uint8_t*) varDataVal(pBlockInfo->firstPk.pData);
pFirst->nData = varDataLen(pBlockInfo->firstPk.pData);
pLast->pData = (uint8_t*) varDataVal(pBlockInfo->lastPk.pData);
pLast->nData = varDataLen(pBlockInfo->lastPk.pData);
} else {
pFirst->val = pBlockInfo->firstPk.val;
pLast->val = pBlockInfo->lastPk.val;
}
}
record->minVer = pBlockInfo->minVer;
record->maxVer = pBlockInfo->maxVer;
record->blockOffset = pBlockInfo->blockOffset;
@ -1091,7 +1107,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
int32_t step = asc ? 1 : -1;
SBrinRecord tmp;
blockInfoToRecord(&tmp, pBlockInfo);
blockInfoToRecord(&tmp, pBlockInfo, pSupInfo);
SBrinRecord* pRecord = &tmp;
// no data exists, return directly.
@ -1290,7 +1306,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord tmp;
blockInfoToRecord(&tmp, pBlockInfo);
blockInfoToRecord(&tmp, pBlockInfo, pSup);
SBrinRecord* pRecord = &tmp;
code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1],
pSup->numOfCols - 1);
@ -1325,9 +1341,9 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
}
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) {
static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord, SBlockLoadSuppInfo* pSupInfo) {
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
@ -1341,7 +1357,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
blockInfoToRecord(pRecord, p);
blockInfoToRecord(pRecord, p, pSupInfo);
*nextIndex = pBlockInfo->tbBlockIdx + step;
return true;
@ -1391,12 +1407,40 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
}
// todo: this attribute could be acquired during extractin the global ordered block list.
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) {
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order, int32_t pkType, int32_t numOfPk) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
return pBlock->lastKey == pRec->firstKey.key.ts;
if (pBlock->lastKey == pRec->firstKey.key.ts) {
if (numOfPk > 0) {
SValue v1 = {.type = pkType};
if (IS_VAR_DATA_TYPE(pkType)) {
v1.pData = (uint8_t*)varDataVal(pBlock->lastPk.pData), v1.nData = varDataLen(pBlock->lastPk.pData);
} else {
v1.val = pBlock->lastPk.val;
}
return (tValueCompare(&v1, &pRec->firstKey.key.pks[0]) == 0);
} else { // no pk
return true;
}
} else {
return false;
}
} else {
return pBlock->firstKey == pRec->lastKey.key.ts;
if (pBlock->firstKey == pRec->lastKey.key.ts) {
if (numOfPk > 0) {
SValue v1 = {.type = pkType};
if (IS_VAR_DATA_TYPE(pkType)) {
v1.pData = (uint8_t*)varDataVal(pBlock->firstPk.pData), v1.nData = varDataLen(pBlock->firstPk.pData);
} else {
v1.val = pBlock->firstPk.val;
}
return (tValueCompare(&v1, &pRec->lastKey.key.pks[0]) == 0);
} else { // no pk
return true;
}
} else {
return false;
}
}
}
@ -1430,23 +1474,26 @@ static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersio
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
SBrinRecord rec = {0};
int32_t neighborIndex = 0;
SBrinRecord rec = {0};
int32_t neighborIndex = 0;
int32_t order = pReader->info.order;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
pReader->info.order, &rec);
bool hasNeighbor =
getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec, pSupInfo);
// overlap with neighbor
if (hasNeighbor) {
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order);
pInfo->overlapWithNeighborBlock =
overlapWithNeighborBlock2(pBlockInfo, &rec, order, pSupInfo->pk.type, pSupInfo->numOfPks);
}
SBrinRecord pRecord;
blockInfoToRecord(&pRecord, pBlockInfo);
blockInfoToRecord(&pRecord, pBlockInfo, pSupInfo);
// has duplicated ts of different version in this block
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, order);
// todo handle the primary key overlap case
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
@ -2380,28 +2427,30 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, bool* loadNeighbor) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
int32_t nextIndex = -1;
int32_t code = TSDB_CODE_SUCCESS;
int32_t order = pReader->info.order;
SDataBlockIter* pIter = &pReader->status.blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
int32_t nextIndex = -1;
SBrinRecord rec = {0};
*loadNeighbor = false;
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex,
pReader->info.order, &rec);
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec, pSupInfo);
if (!hasNeighbor) { // do nothing
return code;
}
if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order)) { // load next block
// load next block
if (overlapWithNeighborBlock2(pBlockInfo, &rec, order, pReader->suppInfo.pk.type, pReader->suppInfo.numOfPks)) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
// 2. remove it from the scan block list
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
@ -4836,11 +4885,11 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
return TSDB_CODE_SUCCESS;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pResBlock->info.id.uid != pFBlock->uid) {
if (pResBlock->info.id.uid != pBlockInfo->uid) {
return TSDB_CODE_SUCCESS;
}
@ -4848,10 +4897,10 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
TARRAY2_CLEAR(&pSup->colAggArray, 0);
SBrinRecord pRecord;
blockInfoToRecord(&pRecord, pFBlock);
blockInfoToRecord(&pRecord, pBlockInfo, pSup);
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pRecord, &pSup->colAggArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pBlockInfo->uid, tstrerror(code),
pReader->idStr);
return code;
}
@ -4880,7 +4929,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
// do fill all null column value SMA info
doFillNullColSMA(pSup, pFBlock->numRow, numOfCols, pTsAgg);
doFillNullColSMA(pSup, pBlockInfo->numRow, numOfCols, pTsAgg);
size_t size = pSup->colAggArray.size;
@ -4906,7 +4955,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.smaLoadTime += 0; // elapsedTime;
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pBlockInfo->uid, pReader->idStr);
return code;
}

View File

@ -2179,6 +2179,7 @@ int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
}
_EXIT:
rocksdb_writebatch_destroy(wb);
rocksdb_iter_destroy(pIter);
rocksdb_readoptions_destroy(pRdOpt);
taosMemoryFree(err);

View File

@ -0,0 +1,665 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "rsync.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
static void processDownstreamReadyRsp(SStreamTask* pTask);
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static void rspMonitorFn(void* param, void* tmrId);
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
// check status
void streamTaskCheckDownstream(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
.stage = pTask->pMeta->stage,
};
ASSERT(pTask->status.downstreamReady == 0);
// serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
processDownstreamReadyRsp(pTask);
}
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t total = streamTaskGetNumOfDownstream(pTask);
int32_t left = -1;
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id);
return TSDB_CODE_SUCCESS;
}
if (pRsp->status == TASK_DOWNSTREAM_READY) {
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
if (left == 0) {
processDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
streamTaskStopMonitorCheckRsp(pInfo, id);
} else {
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else { // not ready, wait for 100ms and retry
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS; // return success in any cases.
}
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
int32_t startTs = pTask->execInfo.checkTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
ASSERT(left > 0);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
}
return 0;
}
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pInfo->pList, &info);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
} else {
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return 0;
}
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, false, id);
pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check rsp mon", id);
return TSDB_CODE_SUCCESS;
}
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
ASSERT(pInfo->inCheckProcess == 0);
pInfo->pList = taosArrayDestroy(pInfo->pList);
if (pInfo->checkRspTmr != NULL) {
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
pInfo->checkRspTmr = NULL;
}
taosThreadMutexDestroy(&pInfo->checkInfoLock);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
int64_t checkTs = pTask->execInfo.checkTs;
int64_t readyTs = pTask->execInfo.readyTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
// halt it self for count window stream task until the related fill history task completed.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
}
// start the related fill-history task, when current task is ready
// not invoke in success callback due to the deadlock.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
}
void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
if (p->nodeId == nodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId};
taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (num + 1));
}
taosThreadMutexUnlock(&pTask->lock);
}
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
taosArrayClear(pInfo->pList);
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
pInfo->notReadyTasks = 1;
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
}
pInfo->startTs = startTs;
return TSDB_CODE_SUCCESS;
}
SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
return p;
}
}
return NULL;
}
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
" expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
// subtract one not-ready-task, since it is ready now
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
} else {
*pNotReady = pInfo->notReadyTasks;
}
p->status = status;
p->rspTs = rspTs;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId,
reqId);
return TSDB_CODE_FAILED;
}
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
if (pInfo->inCheckProcess == 0) {
pInfo->inCheckProcess = 1;
} else {
ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
return TSDB_CODE_FAILED;
}
stDebug("s-task:%s set the in-check-procedure flag", id);
return 0;
}
int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
if (lock) {
taosThreadMutexLock(&pInfo->checkInfoLock);
}
if (!pInfo->inCheckProcess) {
stWarn("s-task:%s already not in-check-procedure", id);
}
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->notReadyTasks = 0;
pInfo->inCheckProcess = 0;
pInfo->stopCheckProcess = 0;
pInfo->notReadyRetryCount = 0;
pInfo->timeoutRetryCount = 0;
taosArrayClear(pInfo->pList);
if (lock) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
}
return 0;
}
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
.stage = pTask->pMeta->stage,
};
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = p->reqId;
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (p->taskId == pVgInfo->taskId) {
req.reqId = p->reqId;
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
p->reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
}
}
} else {
ASSERT(0);
}
}
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->status == TASK_DOWNSTREAM_READY) {
(*numOfReady) += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
p->taskId);
(*numOfFault) += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
}
} else {
taosArrayPush(pNotReadyList, &p->taskId);
}
}
}
}
void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0;
int32_t numOfFault = 0;
int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
return;
}
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id);
return;
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
vgId, ref);
streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return;
}
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
return;
}
// checking of downstream tasks has been stopped by other threads
if (pInfo->stopCheckProcess == 1) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
"fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
return;
}
if (numOfNotReady > 0) { // check to make sure not in recheck timer
ASSERT(pTask->status.downstreamReady == 0);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
pInfo->notReadyRetryCount += 1;
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
}
// todo add into node update list and send to mnode
if (numOfTimeout > 0) {
ASSERT(pTask->status.downstreamReady == 0);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId,
numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
}
}
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -1073,9 +1073,9 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
for (int j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j);
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
@ -1085,7 +1085,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
}
}
taosArrayClear(pTask->outputInfo.pDownstreamUpdateList);
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
taosThreadMutexUnlock(&pTask->lock);
}

View File

@ -32,12 +32,12 @@ typedef struct SLaunchHTaskInfo {
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId,
int32_t hTaskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
@ -165,67 +165,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
return 0;
}
// check status
void streamTaskCheckDownstream(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
.stage = pTask->pMeta->stage,
};
ASSERT(pTask->status.downstreamReady == 0);
// serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
doProcessDownstreamReadyRsp(pTask);
}
}
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
@ -327,122 +266,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
int64_t checkTs = pTask->execInfo.checkTs;
int64_t readyTs = pTask->execInfo.readyTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
// halt it self for count window stream task until the related fill history task completed.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
}
// start the related fill-history task, when current task is ready
// not invoke in success callback due to the deadlock.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
}
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == nodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t total = streamTaskGetNumOfDownstream(pTask);
int32_t left = -1;
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id);
return TSDB_CODE_SUCCESS;
}
if (pRsp->status == TASK_DOWNSTREAM_READY) {
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
if (left == 0) {
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
streamTaskStopMonitorCheckRsp(pInfo, id);
} else {
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else { // not ready, wait for 100ms and retry
int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS; // return success in any cases.
}
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
int32_t startTs = pTask->execInfo.checkTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
ASSERT(left > 0);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
}
return 0;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
@ -798,64 +621,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
}
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;

View File

@ -1094,13 +1094,10 @@ _end:
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
#ifdef USE_ROCKSDB
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
streamStatePutParName_rocksdb(pState, groupId, tbname);
}
return TSDB_CODE_SUCCESS;
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
streamStatePutParName_rocksdb(pState, groupId, tbname);
}
tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
return TSDB_CODE_SUCCESS;
#else
return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
@ -1112,10 +1109,11 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
#ifdef USE_ROCKSDB
void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
if (!pStr) {
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
return streamStateGetParName_rocksdb(pState, groupId, pVal);
int32_t code = streamStateGetParName_rocksdb(pState, groupId, pVal);
if (code == TSDB_CODE_SUCCESS) {
tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
}
return TSDB_CODE_FAILED;
return code;
}
*pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);

View File

@ -21,8 +21,6 @@
#include "ttimer.h"
#include "wal.h"
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
@ -261,8 +259,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->streamTaskId.taskId = taskId;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
@ -442,7 +440,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
}
streamTaskCleanCheckInfo(&pTask->taskCheckInfo);
streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
if (pTask->pState) {
stDebug("s-task:0x%x start to free task state", taskId);
@ -470,7 +468,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
@ -571,8 +569,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pDownstreamUpdateList == NULL) {
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -994,379 +992,3 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
taosArrayClear(pInfo->pList);
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
pInfo->notReadyTasks = 1;
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
}
pInfo->startTs = startTs;
return TSDB_CODE_SUCCESS;
}
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) {
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p->taskId == taskId) {
return p;
}
}
return NULL;
}
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pInfo->pList, &info);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
" expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
// subtract one not-ready-task, since it is ready now
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
} else {
*pNotReady = pInfo->notReadyTasks;
}
p->status = status;
p->rspTs = rspTs;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId,
reqId);
return TSDB_CODE_FAILED;
}
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
if (pInfo->inCheckProcess == 0) {
pInfo->inCheckProcess = 1;
} else {
ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
return TSDB_CODE_FAILED;
}
stDebug("s-task:%s set the in-check-procedure flag", id);
return 0;
}
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) {
if (!pInfo->inCheckProcess) {
stWarn("s-task:%s already not in-check-procedure", id);
}
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->notReadyTasks = 0;
pInfo->inCheckProcess = 0;
pInfo->stopCheckProcess = 0;
taosArrayClear(pInfo->pList);
return 0;
}
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
.stage = pTask->pMeta->stage,
};
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = p->reqId;
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (p->taskId == pVgInfo->taskId) {
req.reqId = p->reqId;
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
p->reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
}
}
} else {
ASSERT(0);
}
}
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->status == TASK_DOWNSTREAM_READY) {
(*numOfReady) += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
p->taskId);
(*numOfFault) += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
}
} else {
taosArrayPush(pNotReadyList, &p->taskId);
}
}
}
}
static void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0;
int32_t numOfFault = 0;
int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
return;
}
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return;
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
vgId, ref);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return;
}
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
return;
}
// checking of downstream tasks has been stopped by other threads
if (pInfo->stopCheckProcess == 1) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
"fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
return;
}
if (numOfNotReady > 0) { // check to make sure not in recheck timer
ASSERT(pTask->status.downstreamReady == 0);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
}
if (numOfTimeout > 0) {
pInfo->startTs = now;
ASSERT(pTask->status.downstreamReady == 0);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
}
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
} else {
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return 0;
}
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, id);
pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check rsp mon", id);
return TSDB_CODE_SUCCESS;
}
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
ASSERT(pInfo->inCheckProcess == 0);
pInfo->pList = taosArrayDestroy(pInfo->pList);
if (pInfo->checkRspTmr != NULL) {
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
pInfo->checkRspTmr = NULL;
}
taosThreadMutexDestroy(&pInfo->checkInfoLock);
}

View File

@ -188,7 +188,7 @@ int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPre
int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) {
uLongf dstLen = outputSize - 1;
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 9);
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, lvl);
if (ret == Z_OK) {
output[0] = 1;
return dstLen + 1;