enh: port main change to 3.0

This commit is contained in:
Hongze Cheng 2024-06-26 10:32:15 +08:00
parent 59c8656e61
commit b761bd2e38
50 changed files with 1069 additions and 950 deletions

View File

@ -18,13 +18,13 @@
#include "os.h"
#include "streamState.h"
#include "streammsg.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "ttimer.h"
#include "streammsg.h"
#ifdef __cplusplus
extern "C" {
@ -761,7 +761,8 @@ tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, int32_t downstreamTaskId);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
int32_t downstreamTaskId);
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);

View File

@ -195,6 +195,7 @@ int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
tFreeSGetUserWhiteListRsp(&wlRsp);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int i = 0; i < wlRsp.numWhiteLists; ++i) {

View File

@ -61,7 +61,6 @@ void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
}
static int32_t stmtCreateRequest(STscStmt* pStmt) {
int32_t code = 0;
@ -338,7 +337,8 @@ int32_t stmtParseSql(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
STableDataCxt** pSrc = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataCxt** pSrc =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pSrc || NULL == *pSrc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -381,7 +381,8 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
qDestroyBoundColInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
}
pStmt->bInfo.stbFName[0] = 0;;
pStmt->bInfo.stbFName[0] = 0;
;
return TSDB_CODE_SUCCESS;
}
@ -506,13 +507,11 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
pStmt->sql.siInfo.tableColsReady = true;
STMT_DLOG_E("end to free SQL info");
return TSDB_CODE_SUCCESS;
}
int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) {
if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
return TSDB_CODE_SUCCESS;
@ -529,7 +528,8 @@ int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) {
return code;
}
code = taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
code =
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -539,7 +539,6 @@ int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
uint64_t suid, int32_t vgId) {
STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
@ -589,7 +588,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
if (pStmt->sql.autoCreateTbl) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
if (pCache) {
@ -720,7 +718,6 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
SStmtQNode* pParam = (SStmtQNode*)param;
@ -732,7 +729,8 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
} else {
STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock, &pStmt->sql.siInfo));
STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
&pStmt->sql.siInfo));
// taosMemoryFree(pParam->pTbData);
@ -741,7 +739,6 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
return TSDB_CODE_SUCCESS;
}
void* stmtBindThreadFunc(void* param) {
setThreadName("stmtBind");
@ -767,7 +764,6 @@ void *stmtBindThreadFunc(void *param) {
return NULL;
}
int32_t stmtStartBindThread(STscStmt* pStmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
@ -925,7 +921,6 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
return TSDB_CODE_SUCCESS;
}
@ -1032,7 +1027,8 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
if (pStmt->sql.stbInterlaceMode) {
pDataBlock = &pStmt->sql.siInfo.pDataCtx;
} else {
pDataBlock = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
pDataBlock =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
@ -1064,15 +1060,16 @@ SArray* stmtGetFreeCol(STscStmt* pStmt, int32_t* idx) {
int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) {
if (NULL == pStmt->sql.siInfo.pVgroupHash) {
pStmt->sql.siInfo.pVgroupHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
pStmt->sql.siInfo.pVgroupHash =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
}
if (NULL == pStmt->sql.siInfo.pVgroupList) {
pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
}
if (NULL == pStmt->sql.siInfo.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, (SRequestObj**)&pStmt->sql.siInfo.pRequest,
pStmt->reqid));
STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
(SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
if (pStmt->reqid != 0) {
pStmt->reqid++;
@ -1083,7 +1080,8 @@ int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) {
pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
}
if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] && 0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
pStmt->sql.siInfo.tbFromHash = true;
}
@ -1232,9 +1230,11 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
if (colIdx < 0) {
if (pStmt->sql.stbInterlaceMode) {
(*pDataBlock)->pData->flags = 0;
code = qBindStmtStbColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo);
code = qBindStmtStbColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo);
} else {
code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
code =
qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
}
if (code) {
@ -1407,7 +1407,8 @@ int stmtStaticModeExec(TAOS_STMT* stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList, pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
@ -1522,12 +1523,15 @@ int stmtClose(TAOS_STMT* stmt) {
pStmt->bindThreadInUse = false;
}
STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 ", parseSqlNum=>%" PRId64
", pStmt->stat.bindDataNum=>%" PRId64 ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64 ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo, pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum,
pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND], pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE],
pStmt->stat.setTbNameUs, pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
stmtCleanSQLInfo(pStmt);

View File

@ -1031,7 +1031,6 @@ static void freeClientVg(void* param){
tOffsetDestroy(&pVg->offsetInfo.endOffset);
tOffsetDestroy(&pVg->offsetInfo.beginOffset);
tOffsetDestroy(&pVg->offsetInfo.committedOffset);
}
static void freeClientVgImpl(void* param) {
SMqClientTopic* pTopic = param;
@ -1282,7 +1281,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code));
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
tmq->consumerId, strerror(code));
if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
code = 0;
}
@ -1672,7 +1672,8 @@ SMqBatchMetaRspObj* tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper)
void changeByteEndian(char* pData) {
char* p = pData;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | version:
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length | version:
int32_t blockVersion = *(int32_t*)p;
ASSERT(blockVersion == BLOCK_VERSION_1);
*(int32_t*)p = BLOCK_VERSION_2;
@ -1709,7 +1710,8 @@ static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, i
}
}
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObjCommon* pRspObj, SMqDataRspCommon* pDataRsp) {
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqRspObjCommon* pRspObj, SMqDataRspCommon* pDataRsp) {
(*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
@ -1790,6 +1792,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
code = TSDB_CODE_INVALID_MSG;
taosMemoryFreeClear(msg);
goto FAIL;
}
@ -2112,9 +2115,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
return NULL;
}
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset,
pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId,
pDataRsp->blockNum != 0);
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever,
tmq->consumerId, pDataRsp->blockNum != 0);
if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
@ -2137,8 +2139,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
pollRspWrapper->reqId);
taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock);

View File

@ -83,10 +83,10 @@ int32_t s3CheckCfg() {
snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
snprintf(path + tmp_len, PATH_MAX, "%s", TD_DIRSEP);
snprintf(path + tmp_len + ds_len, PATH_MAX, "%s", objectname[0]);
snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]);
} else {
snprintf(path + tmp_len, PATH_MAX, "%s", objectname[0]);
snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]);
}
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
@ -1075,32 +1075,32 @@ static SArray *getListByPrefix(const char *prefix) {
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&listBucketCallback};
const char *marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
const char /**marker = 0,*/ *delimiter = 0;
int /*maxkeys = 0, */ allDetails = 0;
list_bucket_callback_data data = {0};
data.objectArray = taosArrayInit(32, sizeof(void *));
if (!data.objectArray) {
uError("%s: %s", __func__, "out of memoty");
return NULL;
}
if (marker) {
/*if (marker) {
snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
} else {
} else {*/
data.nextMarker[0] = 0;
}
//}
data.keyCount = 0;
data.allDetails = allDetails;
do {
data.isTruncated = 0;
do {
S3_list_bucket(&bucketContext, prefix, data.nextMarker, delimiter, maxkeys, 0, timeoutMsG, &listBucketHandler,
&data);
S3_list_bucket(&bucketContext, prefix, data.nextMarker, delimiter, 0 /*maxkeys*/, 0, timeoutMsG,
&listBucketHandler, &data);
} while (S3_status_is_retryable(data.status) && should_retry());
if (data.status != S3StatusOK) {
break;
}
} while (data.isTruncated && (!maxkeys || (data.keyCount < maxkeys)));
} while (data.isTruncated /* && (!maxkeys || (data.keyCount < maxkeys))*/);
if (data.status == S3StatusOK) {
if (data.keyCount > 0) {

View File

@ -20,7 +20,7 @@ int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) {
}
void cos_cp_close(TdFilePtr fd) { taosCloseFile(&fd); }
void cos_cp_remove(char const* filepath) { taosRemoveFile(filepath); }
void cos_cp_remove(char const* filepath) { (void)taosRemoveFile(filepath); }
static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
int32_t code = 0;
@ -380,7 +380,7 @@ void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPa
void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64) {
checkpoint->parts[part_index].completed = 1;
strncpy(checkpoint->parts[part_index].etag, etag, 128);
strncpy(checkpoint->parts[part_index].etag, etag, 127);
checkpoint->parts[part_index].crc64 = crc64;
}
@ -389,11 +389,12 @@ void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t
int i = 0;
checkpoint->cp_type = COS_CP_TYPE_UPLOAD;
strncpy(checkpoint->file_path, filepath, TSDB_FILENAME_LEN);
memset(checkpoint->file_path, 0, TSDB_FILENAME_LEN);
strncpy(checkpoint->file_path, filepath, TSDB_FILENAME_LEN - 1);
checkpoint->file_size = size;
checkpoint->file_last_modified = mtime;
strncpy(checkpoint->upload_id, upload_id, 128);
strncpy(checkpoint->upload_id, upload_id, 127);
checkpoint->part_size = part_size;
for (; i * part_size < size; i++) {

View File

@ -80,7 +80,8 @@ static int32_t generateConfigFile(char* confDir) {
"read only = false\n"
"list = false\n"
"[checkpoint]\n"
"path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir,
"path = %s",
tsCheckpointBackupDir, tsCheckpointBackupDir,
#ifdef WINDOWS
path
#else
@ -154,7 +155,6 @@ void startRsync() {
} else {
uDebug("[rsync] start server successful");
}
}
int32_t uploadByRsync(const char* id, const char* path) {
@ -264,7 +264,8 @@ int32_t downloadRsync(const char* id, const char* path) {
#endif
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX,
snprintf(
command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
tsLogDir, tsSnodeAddress, id,
#ifdef WINDOWS
@ -302,8 +303,8 @@ int32_t deleteRsync(const char* id) {
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/data/", tsLogDir,
tmp, tsSnodeAddress, id);
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/data/",
tsLogDir, tmp, tsSnodeAddress, id);
code = execCommand(command);
taosRemoveDir(tmp);

View File

@ -2983,7 +2983,7 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
}
} else {
if (varDataTLen(data + offset) > bytes) {
uError("var data length invalid, varDataTLen(data + offset):%d >= bytes:%d", (int)varDataTLen(data + offset),
uError("var data length invalid, varDataTLen(data + offset):%d > bytes:%d", (int)varDataTLen(data + offset),
bytes);
code = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto _exit;

View File

@ -392,9 +392,7 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
return 0;
}
struct SConfig *taosGetCfg() {
return tsCfg;
}
struct SConfig *taosGetCfg() { return tsCfg; }
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {

View File

@ -10522,7 +10522,10 @@ int32_t tCloneTbTSMAInfo(STableTSMAInfo *pInfo, STableTSMAInfo **pRes) {
pRet->ast = taosStrdup(pInfo->ast);
if (!pRet->ast) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code) tFreeTableTSMAInfo(pRet);
if (code) {
tFreeTableTSMAInfo(pRet);
pRet = NULL;
}
*pRes = pRet;
return code;
}

View File

@ -161,8 +161,7 @@ int32_t toIntegerPure(const char *z, int32_t n, int32_t base, int64_t *value) {
int32_t toIntegerEx(const char *z, int32_t n, uint32_t type, int64_t *value) {
errno = 0;
char *endPtr = NULL;
switch (type)
{
switch (type) {
case TK_NK_INTEGER: {
*value = taosStr2Int64(z, &endPtr, 10);
if (errno == ERANGE || errno == EINVAL || endPtr - z != n) {

View File

@ -53,7 +53,8 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 0};
.info.noResp = 0,
.info.handle = 0};
SEpSet epset = {0};
dmGetMnodeEpSet(pMgmt->pData, &epset);
@ -154,7 +155,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
.msgType = TDMT_MND_STATUS,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 0};
.info.noResp = 0,
.info.handle = 0};
SRpcMsg rpcRsp = {0};
dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
@ -186,7 +188,8 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
.msgType = TDMT_MND_NOTIFY,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 1};
.info.noResp = 1,
.info.handle = 0};
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);

View File

@ -290,6 +290,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
return -1;
} else {
pMsg->info.handle = 0;
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0;
}

View File

@ -959,7 +959,8 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return -1;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0);
code =
setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0);
if (code != 0) {
taosMemoryFree(buf);
}
@ -1126,8 +1127,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
ready = false;
}
@ -1190,8 +1191,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
taosArrayPush(pList, &in);
int32_t currentSize = taosArrayGetSize(pList);
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %ds(%" PRId64
"s) beyond threshold:%d",
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %ds(%" PRId64 "s) beyond threshold:%d",
pStream->name, pStream->uid, tsStreamCheckpointInterval, duration / 1000, currentSize);
sdbRelease(pSdb, pStream);
@ -1915,7 +1915,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
{ // check for tasks, if tasks are not ready, not allowed to pause
bool found = false;
bool readyToPause = true;
@ -2563,8 +2562,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf",
req.streamId);
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
@ -2688,7 +2686,8 @@ void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
}
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, "update checkpoint-info");
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
"update checkpoint-info");
if (pTrans == NULL) {
return terrno;
}

View File

@ -52,7 +52,8 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
}
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
" reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
@ -205,7 +206,8 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receive pause stream:%s, %s, %"PRId64 ", because grant expired", pStream->name, reqPause.name, pStream->uid);
mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired", pStream->name, reqPause.name,
pStream->uid);
}
sdbRelease(pSdb, pStream);

View File

@ -43,7 +43,8 @@ int32_t mndStreamClearFinishedTrans(SMnode* pMnode, int32_t* pNumOfActiveChkpt)
void *pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, pEntry->startTime);
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
pEntry->startTime);
taosArrayPush(pList, &info);
} else {
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
@ -152,7 +153,8 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamId) {
return 0;
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg) {
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
@ -248,7 +250,11 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode, int32_t acceptCode) {
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode,
STransAction action = {.epSet = *pEpset,
.contLen = contLen,
.pCont = pCont,
.msgType = msgType,
.retryCode = retryCode,
.acceptableCode = acceptCode};
return mndTransAppendRedoAction(pTrans, &action);
}
@ -300,5 +306,3 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("complete clear checkpoints in Dbs");
}

View File

@ -95,8 +95,8 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return pSub;
}
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
SSubplan *pPlan) {
SMqRebVgReq req = {0};
req.oldConsumerId = pRebVg->oldConsumerId;
req.newConsumerId = pRebVg->newConsumerId;
@ -227,8 +227,7 @@ static void pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, c
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(vgs);
SMqRebOutputVg outputVg = {consumerId, -1, pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64,
key, pVgEp->vgId, consumerId);
mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
}
static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
@ -253,7 +252,8 @@ static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, c
}
if (numOfRemoved != actualRemoved) {
mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved, actualRemoved);
mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
actualRemoved);
} else {
mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
}
@ -411,7 +411,8 @@ static void processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, S
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
if (pVgEp->vgId == d1->vgId) {
jump = true;
mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
pConsumerEp->consumerId, pVgEp->vgId);
break;
}
}
@ -451,7 +452,8 @@ static void printRebalanceLog(SMqRebOutputObj *pOutput){
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz);
mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
@ -460,8 +462,8 @@ static void printRebalanceLog(SMqRebOutputObj *pOutput){
}
}
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey,
int32_t *minVgCnt, int32_t *remainderVgCnt){
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
int32_t *remainderVgCnt) {
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
@ -473,7 +475,8 @@ static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, con
} else {
mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey);
}
mInfo("[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
mInfo(
"[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
}
@ -498,8 +501,8 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min
pRebVg = (SMqRebOutputVg *)pAssignIter;
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average",
pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
}
@ -519,8 +522,8 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min
pRebVg = (SMqRebOutputVg *)pAssignIter;
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1",
pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
}
@ -732,8 +735,10 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus);
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
pConsumer->createTime, hbStatus);
if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
@ -762,8 +767,7 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
if (old > 0) mInfo("[rebalance] counter old val:%d", old)
return old == 0;
if (old > 0) mInfo("[rebalance] counter old val:%d", old) return old == 0;
}
void mndRebCntInc() {
@ -875,8 +879,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mDebug("[rebalance] start to process mq timer")
if (!mndRebTryStart()) {
mInfo("[rebalance] mq rebalance already in progress, do nothing")
return code;
mInfo("[rebalance] mq rebalance already in progress, do nothing") return code;
}
SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
@ -957,7 +960,8 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
;
action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
@ -1262,7 +1266,6 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
if (pIter == NULL) break;
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
@ -1340,7 +1343,8 @@ END:
return code;
}
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic,
const char *cgroup, SArray *vgs, SArray *offsetRows) {
int32_t sz = taosArrayGetSize(vgs);
for (int32_t j = 0; j < sz; j++) {
SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
@ -1366,8 +1370,8 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1);
mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
consumerId, varDataVal(cgroup), pVgEp->vgId);
mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
varDataVal(cgroup), pVgEp->vgId);
// offset
OffsetRows *data = NULL;
@ -1435,7 +1439,8 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
if (pIter == NULL) break;
pConsumerEp = (SMqConsumerEp *)pIter;
buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows);
buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs,
pConsumerEp->offsetRows);
}
buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows);

View File

@ -71,7 +71,8 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
startRsync();
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE,
taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
@ -140,7 +141,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return tqStreamTaskProcessDeployReq(pSnode->pMeta, &pSnode->msgCb,pMsg->info.conn.applyIndex, pReq, len, true, true);
return tqStreamTaskProcessDeployReq(pSnode->pMeta, &pSnode->msgCb, pMsg->info.conn.applyIndex, pReq, len, true,
true);
}
case TDMT_STREAM_TASK_DROP:

View File

@ -686,7 +686,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
}
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
taosThreadMutexUnlock(pLock);

View File

@ -647,7 +647,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
STagVal tagVal = {.cid = pCol->colId};
tTagGet((const STag *)pVal, &tagVal);
if (tTagGet((const STag *)pVal, &tagVal)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
@ -655,6 +655,11 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pCol->type)) {
nTagData = tDataTypes[pCol->type].bytes;
}
}
rc = metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey);
tdbFree(pKey);
tdbFree(pVal);
@ -779,7 +784,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
STagVal tagVal = {.cid = pCol->colId};
tTagGet((const STag *)pVal, &tagVal);
if (tTagGet((const STag *)pVal, &tagVal)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
@ -787,6 +792,11 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pCol->type)) {
nTagData = tDataTypes[pCol->type].bytes;
}
}
rc = metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey);
tdbFree(pKey);
tdbFree(pVal);
@ -1272,7 +1282,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
int32_t nTagData = 0;
STagVal tagVal = {.cid = pTagColumn->colId};
tTagGet((const STag *)e.ctbEntry.pTags, &tagVal);
if (tTagGet((const STag *)e.ctbEntry.pTags, &tagVal)) {
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
@ -1280,6 +1290,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pTagColumn->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
nTagData = tDataTypes[pTagColumn->type].bytes;
}
}
if (metaCreateTagIdxKey(e.ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, uid,
&pTagIdxKey, &nTagIdxKey) == 0) {
@ -2036,7 +2051,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
int32_t nTagData = 0;
STagVal tagVal = {.cid = pCol->colId};
tTagGet((const STag *)pVal, &tagVal);
if (tTagGet((const STag *)pVal, &tagVal)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
@ -2044,6 +2059,11 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pCol->type)) {
nTagData = tDataTypes[pCol->type].bytes;
}
}
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
tdbFree(pKey);
tdbFree(pVal);
@ -2497,11 +2517,13 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
goto end;
} else {
for (int i = 0; i < pTagSchema->nCols; i++) {
pTagData = NULL;
nTagData = 0;
pTagColumn = &pTagSchema->pSchema[i];
if (!IS_IDX_ON(pTagColumn)) continue;
STagVal tagVal = {.cid = pTagColumn->colId};
tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal);
if (tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal)) {
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
@ -2509,7 +2531,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pTagColumn->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
nTagData = tDataTypes[pTagColumn->type].bytes;
}
}
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
ret = -1;

View File

@ -697,6 +697,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
@ -963,7 +964,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping and drop it", id);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);

View File

@ -160,7 +160,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId, req.taskId);
tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
@ -194,7 +195,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (pReqTask != NULL) {
tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId, req.transId);
tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
@ -838,6 +840,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, true);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
@ -883,7 +886,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
@ -901,7 +905,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
// re-send the lost checkpoint-trigger msg to downstream task
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
@ -914,7 +919,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
@ -922,7 +928,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
streamMetaReleaseTask(pMeta, pTask);

View File

@ -771,6 +771,7 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
STSchema *pTSchema = NULL;
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexUnlock(&pTsdb->lruMutex);
terrno = code;
return -1;
}

View File

@ -49,7 +49,8 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey,
STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
@ -68,7 +69,8 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
@ -363,9 +365,7 @@ _err:
return code;
}
bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) {
return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type);
}
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); }
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) {
pIter->order = order;
@ -1417,7 +1417,8 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
}
// todo: this attribute could be acquired during extractin the global ordered block list.
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order, int32_t pkType, int32_t numOfPk) {
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order, int32_t pkType,
int32_t numOfPk) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
if (pBlock->lastKey == pRec->firstKey.key.ts) {
@ -2077,7 +2078,8 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
return code;
}
static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) {
SRowKey rowKey = {0};
while (1) {
@ -2362,8 +2364,9 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
if (IS_VAR_DATA_TYPE(pScanInfo->lastProcKey.pks[0].type)) {
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " pk:%s %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pScanInfo->lastProcKey.pks[0].pData, pReader->idStr);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " pk:%s %s", pRow->pBlockData, pRow->iRow,
pSttBlockReader->uid, fRow.pBlockData->aTSKEY[fRow.iRow], pScanInfo->lastProcKey.pks[0].pData,
pReader->idStr);
}
int32_t code =
@ -2513,7 +2516,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
return code;
}
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
goto _end;
}
@ -3572,7 +3576,8 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
pReader->suppInfo.numOfPks > 0);
if (!dropped) {
return pRow;
}
@ -3597,7 +3602,8 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
pReader->suppInfo.numOfPks > 0);
if (!dropped) {
return pRow;
}
@ -3698,7 +3704,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return code;
}
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader) {
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey,
STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
@ -3707,8 +3714,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
pDumpInfo->rowIndex += step;
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pRange, step);
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pRange, step);
}
// all rows are consumed, let's try next file block
@ -4815,7 +4821,6 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
return code;
}
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
// prepare for the next row scan
int32_t step = -1;
@ -4976,6 +4981,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) {
i += 1;
} else if (pSup->colId[j] < pAgg->colId) {
pResBlock->pBlockAgg[pSup->slotId[j]].colId = -1;
*allHave = false;
j += 1;
}
@ -5034,7 +5040,6 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
SReaderStatus* pStatus = &pTReader->status;
if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
// tsdbReaderSuspend2(pReader);
// tsdbReaderResume2(pReader);
@ -5466,8 +5471,7 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr;
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }
void tsdbSetFilesetDelimited(STsdbReader* pReader) { pReader->bFilesetDelimited = true; }

View File

@ -1103,6 +1103,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
if (vnodeValidateTableHash(pVnode, tbName) < 0) {
cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
taosArrayPush(rsp.pArray, &cRsp);
vError("vgId:%d create-table:%s failed due to hash value mismatch", TD_VID(pVnode), tbName);
continue;
}

View File

@ -426,6 +426,7 @@ int32_t ctgdGetClusterCacheNum(SCatalog *pCtg, int32_t type) {
break;
case CTG_DBG_VIEW_NUM:
num += ctgdGetViewNum(dbCache);
break;
default:
ctgError("invalid type:%d", type);
break;

View File

@ -99,7 +99,7 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
@ -225,7 +225,7 @@ static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
@ -291,7 +291,6 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
@ -682,7 +681,7 @@ static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) {
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
@ -1402,7 +1401,6 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
@ -1712,7 +1710,6 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
@ -2004,7 +2001,7 @@ int32_t mAsofBackwardDumpGrpCache(SMJoinWindowCtx* pCtx) {
}
buildGrp.endIdx = buildGrp.readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
pCtx->cache.outRowIdx += rowsLeft;
break;
}

View File

@ -182,7 +182,7 @@ SSDataBlock* doSortMerge(SOperatorInfo* pOperator) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
(void)applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (p->info.rows > 0) {
break;

View File

@ -3902,17 +3902,19 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
if (pInput->rowIdx == -1) {
continue;
}
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
pInput->rowIdx = 0;
pInput->pageIdx = -1;
}
pInput->pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
SColumnInfoData* col = taosArrayGet(pInput->pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
}
__merge_compare_fn_t mergeCompareFn = (!pSubTblsInfo->pPkOrderInfo) ? subTblRowCompareTsFn : subTblRowCompareTsPkFn;
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, mergeCompareFn);
return TSDB_CODE_SUCCESS;
return tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, mergeCompareFn);
}
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {

View File

@ -51,7 +51,8 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) {
return TSDB_CODE_SUCCESS;
}
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) {
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
SStbInterlaceInfo* pBuildInfo) {
// merge according to vgId
return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo);
}
@ -70,12 +71,10 @@ int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDat
return code;
}
/*
int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pVgDataBlocks = NULL;
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t
tbNum) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; SVnodeModifyOpStmt*
pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
// merge according to vgId
if (tbNum > 0) {
@ -217,6 +216,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
pTag = NULL;
end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
@ -227,6 +227,7 @@ end:
}
taosArrayDestroy(pTagArray);
taosArrayDestroy(tagName);
taosMemoryFree(pTag);
return code;
}
@ -243,7 +244,7 @@ int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND*
if (NULL == dst->length) {
dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
if (NULL == dst->buffer) {
if (NULL == dst->length) {
taosMemoryFreeClear(dst->buffer);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -276,7 +277,8 @@ int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND*
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo* pBindInfos) {
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo* pBindInfos) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
@ -306,7 +308,8 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
goto _return;
}
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}
@ -325,8 +328,8 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
pBindInfos[c].bind = pBind;
pBindInfos[c].type = pColSchema->type;
//code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
//if (code) {
// code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes -
// VARSTR_HEADER_SIZE: -1); if (code) {
// goto _return;
// }
}
@ -362,7 +365,8 @@ int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, c
goto _return;
}
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}
@ -377,7 +381,8 @@ int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, c
pBind = bind + c;
}
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
code = tColDataAddValueByBind(pCol, pBind,
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
if (code) {
goto _return;
}
@ -393,9 +398,8 @@ _return:
return code;
}
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t rowNum) {
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen,
int32_t colIdx, int32_t rowNum) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
@ -524,7 +528,6 @@ int32_t qResetStmtColumns(SArray* pCols, bool deepClear) {
return TSDB_CODE_SUCCESS;
}
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
STableDataCxt* pBlock = (STableDataCxt*)block;
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
@ -589,7 +592,7 @@ int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool rese
pNewTb->pCreateTbReq = NULL;
pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL);
if (NULL == pNewTb) {
if (NULL == pNewTb->aCol) {
insDestroyTableDataCxt(*pDst);
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -115,8 +115,7 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
if (dbName == NULL) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (name[0] == '\0')
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
if (code != TSDB_CODE_SUCCESS) {
@ -487,8 +486,8 @@ int insColDataComp(const void* lp, const void* rp) {
return 0;
}
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId, STableColsData* pTbData, SName* sname) {
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
STableColsData* pTbData, SName* sname) {
if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
return TSDB_CODE_SUCCESS;
}
@ -512,8 +511,8 @@ int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuild
return TSDB_CODE_SUCCESS;
}
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData, uint64_t* uid, int32_t* vgId) {
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
uint64_t* uid, int32_t* vgId) {
STableVgUid* pTbInfo = NULL;
int32_t code = 0;
@ -558,7 +557,6 @@ int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo
return code;
}
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
@ -570,9 +568,8 @@ int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDa
return code;
}
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) {
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
SStbInterlaceInfo* pBuildInfo) {
int32_t code = TSDB_CODE_SUCCESS;
uint64_t uid;
int32_t vgId;
@ -627,8 +624,8 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
}
/*
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum) {
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
if (NULL == pVgroupHash || NULL == pVgroupList) {
taosHashCleanup(pVgroupHash);
@ -875,7 +872,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
char* p = (char*)data;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each column length |
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
// column length |
int32_t version = *(int32_t*)data;
p += sizeof(int32_t);
p += sizeof(int32_t);
@ -911,7 +909,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end;
}
if (tFields != NULL && numFields > boundInfo->numOfBound) {
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
if (errstr != NULL)
snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
@ -920,8 +919,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL) snprintf(errstr, errstrLen, "column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, *(int32_t*)(fields + sizeof(int8_t)));
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
@ -951,8 +953,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pColSchema = &pSchema[j];
if (strcmp(pColSchema->name, tFields[i].name) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL) snprintf(errstr, errstrLen, "column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, *(int32_t*)(fields + sizeof(int8_t)));
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}

View File

@ -8112,9 +8112,11 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
for (int32_t i = 1; i < num; ++i) {
SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i);
STranslateContext cxt = {0};
initTranslateContext(pCxt->pParseCxt, pCxt->pMetaCache, &cxt);
code = initTranslateContext(pCxt->pParseCxt, pCxt->pMetaCache, &cxt);
if (TSDB_CODE_SUCCESS == code) {
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2,
1 == i ? &pReq->ast1Len : &pReq->ast2Len);
}
destroyTranslateContext(&cxt);
if (TSDB_CODE_SUCCESS != code) {
break;

View File

@ -90,7 +90,6 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t taskHandle = ctx->taskHandle;
@ -240,7 +239,6 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) {
return false;
}
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
int32_t taskNum = 0;
@ -439,7 +437,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
return TSDB_CODE_SUCCESS;
}
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32_t code) {
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) {
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
@ -511,7 +508,6 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
return TSDB_CODE_SUCCESS;
}
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
@ -747,7 +743,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code);
}
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql,
OPTR_EXEC_MODEL_BATCH);
sql = NULL;
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
@ -1005,6 +1002,8 @@ _return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask);
} else {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
}
}
@ -1013,6 +1012,10 @@ _return:
}
if (ctx) {
if (qwMsg->connInfo.handle != ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
}
qwReleaseTaskCtx(mgmt, ctx);
}
@ -1067,7 +1070,6 @@ _return:
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
SSchedulerHbRsp rsp = {0};

View File

@ -194,9 +194,7 @@ int32_t getCfIdx(const char* cfName) {
return idx;
}
bool isValidCheckpoint(const char* dir) {
return true;
}
bool isValidCheckpoint(const char* dir) { return true; }
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later
@ -486,9 +484,7 @@ _ERROR:
return code;
}
int32_t backendCopyFiles(const char* src, const char* dst) {
return backendFileCopyFilesImpl(src, dst);
}
int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
const char* defaultPath) {
@ -540,7 +536,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
char* chkptPath = taosMemoryCalloc(1, pathLen);
if (chkptId > 0) {
snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId);
snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath);
if (code != 0) {
@ -553,7 +550,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
code = 0; // reset the error code
}
} else { // no valid checkpoint id
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, defaultPath);
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key,
defaultPath);
cleanDir(defaultPath, key);
}
@ -2143,13 +2141,18 @@ void taskDbDestroy(void* pDb, bool flush) {
rocksdb_flushoptions_destroy(flushOpt);
}
}
if (wrapper->pCf != NULL) {
for (int i = 0; i < nCf; i++) {
if (wrapper->pCf[i] != NULL) {
rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
}
}
}
if (wrapper->db) rocksdb_close(wrapper->db);
if (wrapper->db) {
rocksdb_close(wrapper->db);
}
rocksdb_options_destroy(wrapper->dbOpt);
rocksdb_readoptions_destroy(wrapper->readOpt);
@ -2205,7 +2208,8 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code;
}
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) {
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list,
const char* idStr) {
int32_t code = 0;
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
@ -2224,7 +2228,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code;
}
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) {
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list,
const char* idStr) {
int32_t code = -1;
STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type;

View File

@ -58,7 +58,8 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage < stage) {
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
@ -70,7 +71,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
@ -168,7 +168,8 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) {
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
pRsp->status =
streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
@ -183,7 +184,6 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
}
}
}
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
@ -402,7 +402,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64
" expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
@ -509,8 +510,8 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, id,
pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -760,4 +761,3 @@ void rspMonitorFn(void* param, void* tmrId) {
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
}

View File

@ -24,11 +24,13 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) {
@ -362,7 +364,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
taosThreadMutexUnlock(&pInfo->lock);
if (notReady == 0) {
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", id);
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task",
id);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId);
}
@ -444,7 +447,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
// if (restored && (pStatus->state != TASK_STATUS__CK)) {
// stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
// stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%"
// PRId64
// " failed",
// pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
// taosThreadMutexUnlock(&pTask->lock);
@ -619,8 +623,8 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
idStr, checkpointId, el, path);
taosRemoveDir(path);
} else {
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs",
idStr, checkpointId, el);
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
checkpointId, el);
}
taosMemoryFree(path);
@ -641,7 +645,8 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
int64_t dbRefId = taskGetDBRef(pTask->pBackend);
void* pBackend = taskAcquireDb(dbRefId);
if (pBackend == NULL) {
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr);
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
pTask->id.idStr);
return -1;
}

View File

@ -119,7 +119,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
return code;
}
buf = NULL;
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
}
@ -310,7 +309,6 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true);
pTask->msgInfo.pData = pReqs;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
@ -568,8 +566,8 @@ void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
}
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int64_t groupId, int64_t now) {
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
int64_t now) {
uint32_t hashValue = 0;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) {
@ -656,9 +654,7 @@ static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) {
pInfo->rspTs = recvTs;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
@ -719,8 +715,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->msgInfo.lock);
if (pTask->msgInfo.inMonitor == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, ref,
tstrerror(code));
stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
ref, tstrerror(code));
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
pTask->msgInfo.inMonitor = 1;
} else {
@ -880,7 +876,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
SRpcMsg msg = {0};
initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, &msg);
initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId,
&msg);
tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x", id, pTask->info.taskLevel,
@ -918,9 +915,11 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
tmsgSendRsp(&pInfo->msg);
taosArrayClear(pList);
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr,
pTask->info.taskLevel);
} else {
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel);
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr,
pTask->info.taskLevel);
}
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
@ -1049,7 +1048,8 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
return 0;
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask) {
STaskCheckpointReadyInfo info = {
.recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId};
@ -1105,8 +1105,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64
"-0x%x (vgId:%d) idx:%d",
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index);
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
@ -1301,12 +1300,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (notRsp > 0) {
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, "
"waiting "
"for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
} else {
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all "
"rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}
} else {

View File

@ -1052,12 +1052,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
}
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.failed =
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId);
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr,
(*pTask)->chkInfo.pActiveInfo->transId);
}
}
@ -1725,7 +1727,8 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
" ms", id, vgId, transId, el);
" ms",
id, vgId, transId, el);
} else {
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
id, vgId, transId, el);

View File

@ -26,7 +26,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.delaySchedParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
pTask->schedInfo.pDelayTimer =
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}

View File

@ -105,9 +105,11 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
pTask->schedHistoryInfo.pTimer =
taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
@ -204,7 +206,8 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT && pStatus->state != TASK_STATUS__PAUSE) {
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT &&
pStatus->state != TASK_STATUS__PAUSE) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
@ -494,8 +497,8 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
// [pTask->dataRange.range.maxVer, ver1]
pTask->step2Range.minVer = walScanStartVer;
pTask->step2Range.maxVer = nextProcessVer - 1;
stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr,
pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer);
stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64,
pTask->id.idStr, pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer);
return false;
}
}
@ -553,7 +556,8 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer);
}
}

View File

@ -208,9 +208,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount,
pStatis->latestUpdateTs, pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer,
pStatis->checkpoint);
taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
// remove the ref by timer
while (pTask->status.timerActive > 0) {
@ -414,7 +413,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -448,9 +448,7 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
}
}
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) {
return taosArrayGetSize(pTask->upstreamInfo.pList);
}
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
@ -862,7 +860,8 @@ void streamTaskResume(SStreamTask* pTask) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name, pMeta->numOfPausedTasks);
stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
pMeta->numOfPausedTasks);
}
}
@ -979,9 +978,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
return streamTrySchedExec(pTask);
}
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) {
pTask->status.removeBackendFiles = true;
}
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
if (pTransId != NULL) {

View File

@ -200,7 +200,6 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
SStreamTaskSM* pSM = pTask->status.pSM;
bool removed = false;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
for (int32_t i = 0; i < num; ++i) {
@ -218,7 +217,6 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
}
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "streammsg.h"
#include "os.h"
#include "tstream.h"
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
@ -77,7 +77,6 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp
return pEncoder->pos;
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
@ -99,7 +98,6 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
return pEncoder->pos;
}
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
@ -180,7 +178,6 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0;
}
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;

View File

@ -351,7 +351,12 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
}
pCell = tdbPageGetCell(btc.pPage, btc.idx);
tdbBtreeDecodeCell(btc.pPage, pCell, &cd, btc.pTxn, pBt);
ret = tdbBtreeDecodeCell(btc.pPage, pCell, &cd, btc.pTxn, pBt);
if (ret < 0) {
tdbBtcClose(&btc);
tdbError("tdb/btree-pget: decode cell failed with ret: %d.", ret);
return -1;
}
if (ppKey) {
pTKey = tdbRealloc(*ppKey, cd.kLen);
@ -1072,6 +1077,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// pack partial val to local if any space left
if (nLocal > nHeader + kLen + sizeof(SPgno)) {
if (ASSERT(pVal != NULL && vLen != 0)) {
tdbFree(pBuf);
return -1;
}
memcpy(pCell + nHeader + kLen, pVal, nLocal - nHeader - kLen - sizeof(SPgno));
@ -1859,7 +1865,11 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
ret = tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btree-next: decode cell failed with ret: %d.", ret);
return -1;
}
pKey = tdbRealloc(*ppKey, cd.kLen);
if (pKey == NULL) {
@ -1918,7 +1928,11 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
ret = tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btree-prev: decode cell failed with ret: %d.", ret);
return -1;
}
pKey = tdbRealloc(*ppKey, cd.kLen);
if (pKey == NULL) {

View File

@ -672,6 +672,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_INIT(&conn->q);
tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
@ -754,6 +755,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_INIT(&conn->q);
tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
@ -806,6 +808,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->status = ConnInPool;
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
if (conn->list->size >= 10) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
@ -825,6 +828,9 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
exh->refId = transAddExHandle(transGetRefMgt(), exh);
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
@ -849,6 +855,8 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
taosWLockLatch(&exh->latch);
exh->handle = conn;
exh->pThrd = conn->hostThrd;
taosWUnLockLatch(&exh->latch);
conn->refId = exh->refId;
taosWUnLockLatch(&exh->latch);
@ -911,13 +919,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
uv_timer_init(pThrd->loop, timer);
}
timer->data = conn;
conn->timer = timer;
conn->timer = timer;
conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL);
transInitBuffer(&conn->readBuf);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
@ -1027,7 +1033,7 @@ static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = transReqQueueRemove(req);
if (pConn == NULL) return;
SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
if (pMsg != NULL) {
int64_t cost = taosGetTimestampUs() - pMsg->st;
if (cost > 1000 * 50) {
@ -1133,6 +1139,7 @@ void cliSend(SCliConn* pConn) {
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
tDebug("malloc memory: %p", pMsg->pCont);
pMsg->contLen = 0;
}
@ -1579,7 +1586,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
tinet_ntoa(old, *v);
tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
}
}
return;
@ -1914,7 +1921,7 @@ void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
}
}
void cliIteraConnMsgs(SCliConn* conn) {
void cliConnFreeMsgs(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
@ -1956,7 +1963,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
}
}
cliIteraConnMsgs(conn);
cliConnFreeMsgs(conn);
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
destroyCmsg(pMsg);
@ -2017,6 +2024,7 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
if (pMsg == NULL) {
return;
}
tDebug("free memory:%p, free ctx: %p", pMsg, pMsg->ctx);
transDestroyConnCtx(pMsg->ctx);
transFreeMsg(pMsg->msg.pCont);
@ -2523,6 +2531,8 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) {
return NULL;
} else {
tDebug("conn %p got", exh->handle);
}
taosWLockLatch(&exh->latch);
if (exh->pThrd == NULL && trans != NULL) {
@ -2568,7 +2578,7 @@ int transReleaseCliHandle(void* handle) {
cmsg->ctx = pCtx;
STraceId* trace = &tmsg.info.traceId;
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
destroyCmsg(cmsg);
@ -2819,6 +2829,9 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
int64_t transAllocHandle() {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
exh->refId = transAddExHandle(transGetRefMgt(), exh);
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);

View File

@ -305,7 +305,7 @@ void transCtxCleanup(STransCtx* ctx) {
ctx->freeFunc(iter->val);
iter = taosHashIterate(ctx->args, iter);
}
ctx->freeFunc(ctx->brokenVal.val);
if (ctx->freeFunc) ctx->freeFunc(ctx->brokenVal.val);
taosHashCleanup(ctx->args);
ctx->args = NULL;
}
@ -676,6 +676,10 @@ void transDestroyExHandle(void* handle) {
if (handle == NULL) {
return;
}
SExHandle* eh = handle;
if (!QUEUE_IS_EMPTY(&eh->q)) {
tDebug("handle %p mem leak", handle);
}
taosMemoryFree(handle);
}

View File

@ -1095,6 +1095,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
exh->handle = pConn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh);
QUEUE_INIT(&exh->q);
transAcquireExHandle(transGetRefMgt(), exh->refId);
STrans* pTransInst = pThrd->pTransInst;

View File

@ -14,8 +14,8 @@
*/
#include "os.h"
#include "ttypes.h"
#include "tcompression.h"
#include "ttypes.h"
int32_t getWordLength(char type) {
int32_t wordLength = 0;
@ -123,11 +123,11 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
__m512i sum_mask3 = _mm512_set_epi64(3, 3, 3, 3, 3, 3, 3, 3);
__m512i base = _mm512_set1_epi64(w);
__m512i maskVal = _mm512_set1_epi64(mask);
__m512i shiftBits = _mm512_set_epi64(bit * 7 + 4, bit * 6 + 4, bit * 5 + 4, bit * 4 + 4, bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
__m512i shiftBits = _mm512_set_epi64(bit * 7 + 4, bit * 6 + 4, bit * 5 + 4, bit * 4 + 4, bit * 3 + 4,
bit * 2 + 4, bit + 4, 4);
__m512i inc = _mm512_set1_epi64(bit << 3);
for (int32_t i = 0; i < batch; ++i) {
__m512i after = _mm512_srlv_epi64(base, shiftBits);
__m512i zigzagVal = _mm512_and_si512(after, maskVal);
@ -142,28 +142,27 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
// decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3]
//7 6 5 4 3 2 1 0
//D7 D6 D5 D4 D3 D2 D1 D0
//D6 0 D4 0 D2 0 D0 0
//D7+D6 D6 D5+D4 D4 D3+D2 D2 D1+D0 D0
//13 6 9 4 5 2 1 0
// 7 6 5 4 3 2 1
// 0 D7 D6 D5 D4 D3 D2 D1
// D0 D6 0 D4 0 D2 0 D0
// 0 D7+D6 D6 D5+D4 D4 D3+D2 D2
// D1+D0 D0 13 6 9 4 5 2
// 1 0
__m512i prev = _mm512_set1_epi64(prevValue);
__m512i cum_sum = _mm512_add_epi64(delta, _mm512_maskz_permutexvar_epi64(0xaa, sum_mask1, delta));
cum_sum = _mm512_add_epi64(cum_sum, _mm512_maskz_permutexvar_epi64(0xcc, sum_mask2, cum_sum));
cum_sum = _mm512_add_epi64(cum_sum, _mm512_maskz_permutexvar_epi64(0xf0, sum_mask3, cum_sum));
//13 6 9 4 5 2 1 0
//D7,D6 D6 D5,D4 D4 D3,D2 D2 D1,D0 D0
//+D5,D4 D5,D4, 0 0 D1,D0 D1,D0 0 0
//D7~D4 D6~D4 D5~D4 D4 D3~D0 D2~D0 D1~D0 D0
//22 15 9 4 6 3 1 0
// 13 6 9 4 5 2 1
// 0 D7,D6 D6 D5,D4 D4 D3,D2 D2
// D1,D0 D0 +D5,D4 D5,D4, 0 0 D1,D0 D1,D0
//0 0 D7~D4 D6~D4 D5~D4 D4 D3~D0 D2~D0
// D1~D0 D0 22 15 9 4 6 3
// 1 0
//
//D3~D0 D3~D0 D3~D0 D3~D0 0 0 0 0
//28 21 15 10 6 3 1 0
// D3~D0 D3~D0 D3~D0 D3~D0 0 0 0
// 0 28 21 15 10 6 3 1
// 0
cum_sum = _mm512_add_epi64(cum_sum, prev);
_mm512_storeu_si512((__m512i *)&p[_pos], cum_sum);
@ -171,7 +170,6 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
shiftBits = _mm512_add_epi64(shiftBits, inc);
prevValue = p[_pos + 7];
_pos += 8;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {