Merge branch 'main' into enh/rocksdbSstate
This commit is contained in:
commit
cb223acf82
|
@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
|
||||||
|
|
||||||
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
|
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
|
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
|
||||||
|
|
||||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
|
||||||
|
|
||||||
void qStreamSetOpen(qTaskInfo_t tinfo);
|
void qStreamSetOpen(qTaskInfo_t tinfo);
|
||||||
|
|
|
@ -146,7 +146,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TSC_CONN_KILLED TAOS_DEF_ERROR_CODE(0, 0x0215)
|
#define TSDB_CODE_TSC_CONN_KILLED TAOS_DEF_ERROR_CODE(0, 0x0215)
|
||||||
#define TSDB_CODE_TSC_SQL_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x0216)
|
#define TSDB_CODE_TSC_SQL_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x0216)
|
||||||
#define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217)
|
#define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217)
|
||||||
#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218)
|
//#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218)
|
||||||
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219)
|
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219)
|
||||||
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A)
|
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A)
|
||||||
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B)
|
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B)
|
||||||
|
@ -261,6 +261,7 @@ int32_t* taosGetErrno();
|
||||||
// #define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) // 2.x
|
// #define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) // 2.x
|
||||||
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
|
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
|
||||||
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
|
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
|
||||||
|
#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370)
|
||||||
|
|
||||||
|
|
||||||
// mnode-func
|
// mnode-func
|
||||||
|
|
|
@ -169,6 +169,7 @@ typedef struct {
|
||||||
int32_t uid; // used for automatic create child table
|
int32_t uid; // used for automatic create child table
|
||||||
|
|
||||||
SHashObj *childTables;
|
SHashObj *childTables;
|
||||||
|
SHashObj *tableUids;
|
||||||
SHashObj *superTables;
|
SHashObj *superTables;
|
||||||
SHashObj *pVgHash;
|
SHashObj *pVgHash;
|
||||||
|
|
||||||
|
@ -242,6 +243,7 @@ int8_t smlGetTsTypeByLen(int32_t len);
|
||||||
SSmlTableInfo* smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen);
|
SSmlTableInfo* smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen);
|
||||||
SSmlSTableMeta* smlBuildSTableMeta(bool isDataFormat);
|
SSmlSTableMeta* smlBuildSTableMeta(bool isDataFormat);
|
||||||
int32_t smlSetCTableName(SSmlTableInfo *oneTable);
|
int32_t smlSetCTableName(SSmlTableInfo *oneTable);
|
||||||
|
void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo);
|
||||||
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen);
|
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen);
|
||||||
int32_t is_same_child_table_telnet(const void *a, const void *b);
|
int32_t is_same_child_table_telnet(const void *a, const void *b);
|
||||||
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len);
|
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len);
|
||||||
|
|
|
@ -195,6 +195,20 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo){
|
||||||
|
char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
|
||||||
|
size_t nLen = strlen(tinfo->childTableName);
|
||||||
|
memcpy(key, currElement->measure, currElement->measureLen);
|
||||||
|
memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
|
||||||
|
void *uid = taosHashGet(info->tableUids, key, currElement->measureLen + 1 + nLen); // use \0 as separator for stable name and child table name
|
||||||
|
if (uid == NULL) {
|
||||||
|
tinfo->uid = info->uid++;
|
||||||
|
taosHashPut(info->tableUids, key, currElement->measureLen + 1 + nLen, &tinfo->uid, sizeof(uint64_t));
|
||||||
|
}else{
|
||||||
|
tinfo->uid = *(uint64_t*)uid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
|
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
|
||||||
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
|
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
|
||||||
if (!meta) {
|
if (!meta) {
|
||||||
|
@ -1142,6 +1156,7 @@ void smlDestroyInfo(SSmlHandle *info) {
|
||||||
taosHashCleanup(info->pVgHash);
|
taosHashCleanup(info->pVgHash);
|
||||||
taosHashCleanup(info->childTables);
|
taosHashCleanup(info->childTables);
|
||||||
taosHashCleanup(info->superTables);
|
taosHashCleanup(info->superTables);
|
||||||
|
taosHashCleanup(info->tableUids);
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) {
|
for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) {
|
||||||
cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
|
cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
|
||||||
|
@ -1192,6 +1207,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
|
||||||
|
|
||||||
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
info->id = smlGenId();
|
info->id = smlGenId();
|
||||||
|
@ -1202,7 +1218,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
|
||||||
info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
|
info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
|
||||||
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
|
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
|
||||||
|
|
||||||
if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) {
|
if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables || NULL == info->tableUids) {
|
||||||
uError("create SSmlHandle failed");
|
uError("create SSmlHandle failed");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
@ -1320,23 +1336,23 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
if (info->pRequest->dbList == NULL) {
|
if (info->pRequest->dbList == NULL) {
|
||||||
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
||||||
}
|
}
|
||||||
void *data = taosArrayReserve(info->pRequest->dbList, 1);
|
char *data = (char*)taosArrayReserve(info->pRequest->dbList, 1);
|
||||||
memcpy(data, info->pRequest->pDb,
|
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
|
||||||
TSDB_DB_FNAME_LEN > strlen(info->pRequest->pDb) ? strlen(info->pRequest->pDb) : TSDB_DB_FNAME_LEN);
|
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
|
||||||
|
tNameGetFullDbName(&pName, data);
|
||||||
|
|
||||||
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
|
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
|
||||||
while (oneTable) {
|
while (oneTable) {
|
||||||
SSmlTableInfo *tableData = *oneTable;
|
SSmlTableInfo *tableData = *oneTable;
|
||||||
|
tstrncpy(pName.tname, tableData->sTableName, tableData->sTableNameLen + 1);
|
||||||
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
|
|
||||||
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
|
|
||||||
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
|
|
||||||
|
|
||||||
if (info->pRequest->tableList == NULL) {
|
if (info->pRequest->tableList == NULL) {
|
||||||
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
|
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
|
||||||
}
|
}
|
||||||
taosArrayPush(info->pRequest->tableList, &pName);
|
taosArrayPush(info->pRequest->tableList, &pName);
|
||||||
|
|
||||||
|
tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1);
|
||||||
|
|
||||||
SRequestConnInfo conn = {0};
|
SRequestConnInfo conn = {0};
|
||||||
conn.pTrans = info->taos->pAppInfo->pTransporter;
|
conn.pTrans = info->taos->pAppInfo->pTransporter;
|
||||||
conn.requestId = info->pRequest->requestId;
|
conn.requestId = info->pRequest->requestId;
|
||||||
|
@ -1428,6 +1444,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
|
||||||
|
|
||||||
taosHashClear(info->childTables);
|
taosHashClear(info->childTables);
|
||||||
taosHashClear(info->superTables);
|
taosHashClear(info->superTables);
|
||||||
|
taosHashClear(info->tableUids);
|
||||||
|
|
||||||
if (!info->dataFormat) {
|
if (!info->dataFormat) {
|
||||||
if (unlikely(info->lines != NULL)) {
|
if (unlikely(info->lines != NULL)) {
|
||||||
|
@ -1562,7 +1579,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
||||||
do {
|
do {
|
||||||
code = smlModifyDBSchemas(info);
|
code = smlModifyDBSchemas(info);
|
||||||
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|
||||||
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM) break;
|
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|
||||||
|
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break;
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
||||||
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
||||||
|
@ -1647,7 +1665,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
info->cost.endTime = taosGetTimestampUs();
|
info->cost.endTime = taosGetTimestampUs();
|
||||||
info->cost.code = code;
|
info->cost.code = code;
|
||||||
if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING ||
|
if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING ||
|
||||||
code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) {
|
code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT ||
|
||||||
|
code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
if (cnt++ >= 10) {
|
if (cnt++ >= 10) {
|
||||||
uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -778,7 +778,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
|
||||||
tinfo->tags = taosArrayDup(preLineKV, NULL);
|
tinfo->tags = taosArrayDup(preLineKV, NULL);
|
||||||
|
|
||||||
smlSetCTableName(tinfo);
|
smlSetCTableName(tinfo);
|
||||||
tinfo->uid = info->uid++;
|
getTableUid(info, elements, tinfo);
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
info->currSTableMeta->uid = tinfo->uid;
|
info->currSTableMeta->uid = tinfo->uid;
|
||||||
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
|
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
|
||||||
|
|
|
@ -312,7 +312,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
}
|
}
|
||||||
|
|
||||||
smlSetCTableName(tinfo);
|
smlSetCTableName(tinfo);
|
||||||
tinfo->uid = info->uid++;
|
getTableUid(info, currElement, tinfo);
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
info->currSTableMeta->uid = tinfo->uid;
|
info->currSTableMeta->uid = tinfo->uid;
|
||||||
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
|
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
|
||||||
|
|
|
@ -206,7 +206,7 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
|
||||||
tinfo->tags = taosArrayDup(preLineKV, NULL);
|
tinfo->tags = taosArrayDup(preLineKV, NULL);
|
||||||
|
|
||||||
smlSetCTableName(tinfo);
|
smlSetCTableName(tinfo);
|
||||||
tinfo->uid = info->uid++;
|
getTableUid(info, elements, tinfo);
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
info->currSTableMeta->uid = tinfo->uid;
|
info->currSTableMeta->uid = tinfo->uid;
|
||||||
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
|
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
|
||||||
|
|
|
@ -120,6 +120,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
|
||||||
pColumnInfoData->varmeta.length += dataLen;
|
pColumnInfoData->varmeta.length += dataLen;
|
||||||
} else {
|
} else {
|
||||||
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
|
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
|
||||||
|
colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1949,12 +1950,11 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// for debug
|
// for debug
|
||||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
|
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
|
||||||
int32_t size = 2048;
|
int32_t size = 2048*1024;
|
||||||
*pDataBuf = taosMemoryCalloc(size, 1);
|
*pDataBuf = taosMemoryCalloc(size, 1);
|
||||||
char* dumpBuf = *pDataBuf;
|
char* dumpBuf = *pDataBuf;
|
||||||
char pBuf[128] = {0};
|
char pBuf[128] = {0};
|
||||||
|
@ -1970,7 +1970,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
|
len += snprintf(dumpBuf + len, size - len, "%s %d|", flag, j);
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
|
|
||||||
for (int32_t k = 0; k < colNum; k++) {
|
for (int32_t k = 0; k < colNum; k++) {
|
||||||
|
|
|
@ -50,6 +50,8 @@ void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *p
|
||||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
|
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
|
||||||
|
|
||||||
|
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -797,6 +797,11 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
|
||||||
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
|
||||||
SField *pField = taosArrayGet(pCreate->pColumns, i);
|
SField *pField = taosArrayGet(pCreate->pColumns, i);
|
||||||
SSchema *pSchema = &pDst->pColumns[i];
|
SSchema *pSchema = &pDst->pColumns[i];
|
||||||
|
@ -927,6 +932,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
|
||||||
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
|
||||||
SField *pField = taosArrayGet(createReq->pColumns, i);
|
SField *pField = taosArrayGet(createReq->pColumns, i);
|
||||||
SSchema *pSchema = &pDst->pColumns[i];
|
SSchema *pSchema = &pDst->pColumns[i];
|
||||||
|
@ -1154,6 +1164,11 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){
|
||||||
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ntags; i++) {
|
for (int32_t i = 0; i < ntags; i++) {
|
||||||
SField *pField = taosArrayGet(pFields, i);
|
SField *pField = taosArrayGet(pFields, i);
|
||||||
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
|
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
|
||||||
|
@ -1461,6 +1476,11 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){
|
||||||
|
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ncols; i++) {
|
for (int32_t i = 0; i < ncols; i++) {
|
||||||
SField *pField = taosArrayGet(pFields, i);
|
SField *pField = taosArrayGet(pFields, i);
|
||||||
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
|
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
|
||||||
|
|
|
@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER);
|
SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER);
|
||||||
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER)
|
||||||
useDb = taosHashIterate(pUser->writeTbs, useDb);
|
useDb = taosHashIterate(pUser->useDbs, useDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
|
||||||
|
|
|
@ -2006,7 +2006,7 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
|
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
SSdbRaw *pRaw = NULL;
|
SSdbRaw *pRaw = NULL;
|
||||||
|
|
|
@ -255,14 +255,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
||||||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||||
int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock);
|
|
||||||
int32_t tqNextBlockInWal(STqReader* pReader);
|
int32_t tqNextBlockInWal(STqReader* pReader);
|
||||||
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
|
||||||
|
|
||||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
|
||||||
bool tqNextBlockImpl(STqReader *pReader);
|
bool tqNextBlockImpl(STqReader *pReader);
|
||||||
|
|
||||||
|
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
||||||
|
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
||||||
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
||||||
|
|
||||||
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -214,7 +214,6 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit);
|
|
||||||
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
|
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
|
||||||
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -1082,12 +1082,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
if(taosHashGetSize(pTq->pPushMgr) > 0){
|
|
||||||
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
if (taosHashGetSize(pTq->pPushMgr) > 0) {
|
||||||
while(pIter){
|
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
||||||
|
|
||||||
|
while (pIter) {
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId);
|
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
||||||
if(ASSERT(pHandle->msg != NULL)){
|
|
||||||
|
if (ASSERT(pHandle->msg != NULL)) {
|
||||||
tqError("pHandle->msg should not be null");
|
tqError("pHandle->msg should not be null");
|
||||||
break;
|
break;
|
||||||
}else{
|
}else{
|
||||||
|
@ -1096,77 +1099,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
||||||
taosMemoryFree(pHandle->msg);
|
taosMemoryFree(pHandle->msg);
|
||||||
pHandle->msg = NULL;
|
pHandle->msg = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashClear(pTq->pPushMgr);
|
taosHashClear(pTq->pPushMgr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// unlock
|
// unlock
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
|
||||||
#if 0
|
|
||||||
void* pIter = NULL;
|
|
||||||
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT);
|
|
||||||
if (pSubmit == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("failed to create data submit for stream since out of memory");
|
|
||||||
saveOffsetForAllTasks(pTq, submit.ver);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
|
||||||
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
|
|
||||||
pTask->status.taskStatus);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if offset value exists
|
|
||||||
char key[128] = {0};
|
|
||||||
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
|
|
||||||
|
|
||||||
if (tInputQueueIsFull(pTask)) {
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
|
|
||||||
|
|
||||||
int64_t ver = submit.ver;
|
|
||||||
if (pOffset == NULL) {
|
|
||||||
doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
|
|
||||||
} else {
|
|
||||||
ver = pOffset->val.version;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver);
|
|
||||||
taosArrayPush(pInputQueueFullTasks, &pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if offset value exists
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
|
|
||||||
ASSERT(pOffset == NULL);
|
|
||||||
|
|
||||||
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamDataSubmitDestroy(pSubmit);
|
|
||||||
taosFreeQitem(pSubmit);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
tqStartStreamTasks(pTq);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,250 +16,10 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
#if 0
|
|
||||||
void tqTmrRspFunc(void* param, void* tmrId) {
|
|
||||||
STqHandle* pHandle = (STqHandle*)param;
|
|
||||||
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
|
|
||||||
SStreamDataSubmit* pSubmit = *ppSubmit;
|
|
||||||
while (pSubmit != NULL) {
|
|
||||||
if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
|
|
||||||
}
|
|
||||||
// update processed
|
|
||||||
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
|
||||||
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
|
|
||||||
streamDataSubmitDestroy(pSubmit);
|
|
||||||
if (pRsp->blockNum > 0) {
|
|
||||||
*ppSubmit = pSubmit;
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*ppSubmit = pSubmit;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
|
||||||
SMqDataRsp rsp = {0};
|
|
||||||
// 1. guard and set status executing
|
|
||||||
int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
|
|
||||||
TASK_EXEC_STATUS__EXECUTING);
|
|
||||||
if (execStatus == TASK_EXEC_STATUS__IDLE) {
|
|
||||||
SStreamDataSubmit* pSubmit = NULL;
|
|
||||||
// 2. check processedVer
|
|
||||||
// 2.1. if not missed, get msg from queue
|
|
||||||
// 2.2. if missed, scan wal
|
|
||||||
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
|
|
||||||
while (pHandle->pushHandle.processedVer <= pSubmit->ver) {
|
|
||||||
// read from wal
|
|
||||||
}
|
|
||||||
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
|
|
||||||
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
|
|
||||||
streamDataSubmitDestroy(pSubmit);
|
|
||||||
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
|
|
||||||
if (pSubmit == NULL) break;
|
|
||||||
}
|
|
||||||
// 3. exec, after each success, update processed ver
|
|
||||||
// first run
|
|
||||||
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
|
|
||||||
goto SEND_RSP;
|
|
||||||
}
|
|
||||||
// set exec status closing
|
|
||||||
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING);
|
|
||||||
// second run
|
|
||||||
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
|
|
||||||
goto SEND_RSP;
|
|
||||||
}
|
|
||||||
// set exec status idle
|
|
||||||
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
|
|
||||||
}
|
|
||||||
SEND_RSP:
|
|
||||||
// 4. if get result
|
|
||||||
// 4.1 set exec input status blocked and exec status idle
|
|
||||||
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
|
|
||||||
// 4.2 rpc send
|
|
||||||
rsp.rspOffset = pHandle->pushHandle.processedVer;
|
|
||||||
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
|
|
||||||
/*return -1;*/
|
|
||||||
/*}*/
|
|
||||||
// 4.3 clear rpc info
|
|
||||||
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
|
|
||||||
memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle));
|
|
||||||
pHandle->pushHandle.inputQ.queue = taosOpenQueue();
|
|
||||||
pHandle->pushHandle.inputQ.qall = taosAllocateQall();
|
|
||||||
if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) {
|
|
||||||
if (pHandle->pushHandle.inputQ.queue) {
|
|
||||||
taosCloseQueue(pHandle->pushHandle.inputQ.queue);
|
|
||||||
}
|
|
||||||
if (pHandle->pushHandle.inputQ.qall) {
|
|
||||||
taosFreeQall(pHandle->pushHandle.inputQ.qall);
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
|
|
||||||
int64_t timeout) {
|
|
||||||
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
|
|
||||||
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
|
|
||||||
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
|
|
||||||
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
|
|
||||||
atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
|
|
||||||
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
|
|
||||||
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
|
|
||||||
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
|
||||||
SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
|
|
||||||
if (pSubmitClone == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
|
|
||||||
//
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
|
|
||||||
if (msgType != TDMT_VND_SUBMIT) return 0;
|
|
||||||
void* pIter = NULL;
|
|
||||||
STqHandle* pHandle = NULL;
|
|
||||||
SSubmitReq* pReq = (SSubmitReq*)msg;
|
|
||||||
int32_t workerId = 4;
|
|
||||||
int64_t fetchOffset = ver;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pIter = taosHashIterate(pTq->pushMgr, pIter);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
pHandle = *(STqHandle**)pIter;
|
|
||||||
|
|
||||||
taosWLockLatch(&pHandle->pushHandle.lock);
|
|
||||||
|
|
||||||
SMqDataRsp rsp = {0};
|
|
||||||
rsp.reqOffset = pHandle->pushHandle.reqOffset;
|
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
|
||||||
tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
|
|
||||||
} else {
|
|
||||||
tqError("tq push unexpected msg type %d", msgType);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rsp.blockNum == 0) {
|
|
||||||
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp.rspOffset = fetchOffset;
|
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
// todo free
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
|
||||||
((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
|
|
||||||
((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
|
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
|
||||||
|
|
||||||
SRpcMsg resp = {
|
|
||||||
.info = pHandle->pushHandle.rpcInfo,
|
|
||||||
.pCont = buf,
|
|
||||||
.contLen = tlen,
|
|
||||||
.code = 0,
|
|
||||||
};
|
|
||||||
tmsgSendRsp(&resp);
|
|
||||||
|
|
||||||
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
|
|
||||||
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
|
||||||
|
|
||||||
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64,
|
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
|
|
||||||
rsp.reqOffset, rsp.rspOffset);
|
|
||||||
|
|
||||||
// TODO destroy
|
|
||||||
taosArrayDestroy(rsp.blockData);
|
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
|
||||||
// int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
|
||||||
// int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
tqProcessSubmitReqForSubscribe(pTq);
|
tqProcessSubmitReqForSubscribe(pTq);
|
||||||
// lock push mgr to avoid potential msg lost
|
|
||||||
// taosWLockLatch(&pTq->lock);
|
|
||||||
//
|
|
||||||
// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
|
||||||
// if (numOfRegisteredPush > 0) {
|
|
||||||
// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
|
||||||
// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
|
||||||
//
|
|
||||||
// void* data = taosMemoryMalloc(len);
|
|
||||||
// if (data == NULL) {
|
|
||||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
|
|
||||||
// taosWUnLockLatch(&pTq->lock);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// memcpy(data, pReq, len);
|
|
||||||
//
|
|
||||||
// SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
|
|
||||||
// void* pIter = NULL;
|
|
||||||
//
|
|
||||||
// while (1) {
|
|
||||||
// pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
|
||||||
// if (pIter == NULL) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
|
||||||
//
|
|
||||||
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
|
||||||
// if (pHandle == NULL) {
|
|
||||||
// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
|
|
||||||
// pPushEntry->subKey);
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// STqExecHandle* pExec = &pHandle->execHandle;
|
|
||||||
// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// doRemovePushedEntry(cachedKey, pTq);
|
|
||||||
// taosArrayDestroyEx(cachedKey, freeItem);
|
|
||||||
// taosMemoryFree(data);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // unlock
|
|
||||||
// taosWUnLockLatch(&pTq->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||||
|
@ -275,8 +35,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
SPackedData submit = {0};
|
tqStartStreamTasks(pTq);
|
||||||
tqProcessSubmitReq(pTq, submit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_DELETE) {
|
if (msgType == TDMT_VND_DELETE) {
|
||||||
|
@ -287,16 +46,16 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
STqHandle* pHandle = (STqHandle*) handle;
|
STqHandle* pHandle = (STqHandle*)handle;
|
||||||
if(pHandle->msg == NULL){
|
|
||||||
|
if (pHandle->msg == NULL) {
|
||||||
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||||
}else{
|
} else {
|
||||||
void *tmp = pHandle->msg->pCont;
|
void* tmp = pHandle->msg->pCont;
|
||||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||||
pHandle->msg->pCont = tmp;
|
pHandle->msg->pCont = tmp;
|
||||||
}
|
}
|
||||||
|
@ -304,7 +63,8 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||||
pHandle->msg->contLen = pMsg->contLen;
|
pHandle->msg->contLen = pMsg->contLen;
|
||||||
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
||||||
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
||||||
|
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,6 +74,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
|
||||||
|
|
||||||
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
||||||
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
||||||
|
|
||||||
if(pHandle->msg != NULL) {
|
if(pHandle->msg != NULL) {
|
||||||
tqPushDataRsp(pTq, pHandle);
|
tqPushDataRsp(pTq, pHandle);
|
||||||
|
|
||||||
|
@ -321,5 +82,6 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
|
||||||
taosMemoryFree(pHandle->msg);
|
taosMemoryFree(pHandle->msg);
|
||||||
pHandle->msg = NULL;
|
pHandle->msg = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,6 +332,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
||||||
|
|
||||||
// try next message in wal file
|
// try next message in wal file
|
||||||
|
// todo always retry to avoid read failure caused by wal file deletion
|
||||||
if (walNextValidMsg(pWalReader) < 0) {
|
if (walNextValidMsg(pWalReader) < 0) {
|
||||||
return FETCH_TYPE__NONE;
|
return FETCH_TYPE__NONE;
|
||||||
}
|
}
|
||||||
|
@ -374,7 +375,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
|
|
||||||
if (pReader->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pReader, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
||||||
return FETCH_TYPE__DATA;
|
return FETCH_TYPE__DATA;
|
||||||
}
|
}
|
||||||
|
@ -384,7 +385,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver);
|
tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver);
|
||||||
|
|
||||||
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pReader, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
||||||
return FETCH_TYPE__DATA;
|
return FETCH_TYPE__DATA;
|
||||||
}
|
}
|
||||||
|
@ -399,31 +400,6 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
|
|
||||||
while (1) {
|
|
||||||
if (pReader->msg.msgStr == NULL) {
|
|
||||||
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
|
||||||
return FETCH_TYPE__NONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
|
||||||
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
|
||||||
int64_t ver = pReader->pWalReader->pHead->head.version;
|
|
||||||
|
|
||||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (tqNextBlockImpl(pReader)) {
|
|
||||||
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return FETCH_TYPE__DATA;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
|
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
|
||||||
pReader->msg.msgStr = msgStr;
|
pReader->msg.msgStr = msgStr;
|
||||||
pReader->msg.msgLen = msgLen;
|
pReader->msg.msgLen = msgLen;
|
||||||
|
@ -527,7 +503,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
|
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
|
||||||
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
||||||
|
@ -535,6 +511,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
*pSubmitTbDataRet = pSubmitTbData;
|
*pSubmitTbDataRet = pSubmitTbData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
int32_t sversion = pSubmitTbData->sver;
|
int32_t sversion = pSubmitTbData->sver;
|
||||||
|
@ -603,7 +580,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
||||||
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
j++;
|
j++;
|
||||||
|
@ -622,7 +599,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
|
|
||||||
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
|
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->info.rows = numOfRows;
|
pBlock->info.rows = numOfRows;
|
||||||
|
@ -638,7 +615,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
while (targetIdx < colActual) {
|
while (targetIdx < colActual) {
|
||||||
if (sourceIdx >= numOfCols) {
|
if (sourceIdx >= numOfCols) {
|
||||||
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
|
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SColData* pCol = taosArrayGet(pCols, sourceIdx);
|
SColData* pCol = taosArrayGet(pCols, sourceIdx);
|
||||||
|
@ -647,7 +624,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
|
|
||||||
if (pCol->nVal != numOfRows) {
|
if (pCol->nVal != numOfRows) {
|
||||||
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
|
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCol->cid < pColData->info.colId) {
|
if (pCol->cid < pColData->info.colId) {
|
||||||
|
@ -661,14 +638,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
||||||
varDataSetLen(val, colVal.value.nData);
|
varDataSetLen(val, colVal.value.nData);
|
||||||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
colDataSetNULL(pColData, i);
|
colDataSetNULL(pColData, i);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -710,14 +687,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
||||||
varDataSetLen(val, colVal.value.nData);
|
varDataSetLen(val, colVal.value.nData);
|
||||||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
colDataSetNULL(pColData, i);
|
colDataSetNULL(pColData, i);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -735,10 +712,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
FAIL:
|
|
||||||
blockDataFreeRes(pBlock);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
|
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
|
||||||
|
|
|
@ -66,9 +66,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
|
||||||
|
|
||||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||||
const int32_t MAX_ROWS_TO_RETURN = 4096;
|
const int32_t MAX_ROWS_TO_RETURN = 4096;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
int32_t code = 0;
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
int32_t totalRows = 0;
|
int32_t code = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
|
|
||||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
|
@ -175,7 +175,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
// lock
|
// lock
|
||||||
|
@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
|
|
||||||
if (offset->type == TMQ_OFFSET__LOG) {
|
if (offset->type == TMQ_OFFSET__LOG) {
|
||||||
|
verifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version + 1;
|
int64_t fetchVer = offset->version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
|
@ -361,11 +362,10 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
||||||
// this is a normal subscribe requirement
|
// this is a normal subscribe requirement
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
||||||
|
} else { // todo handle the case where re-balance occurs.
|
||||||
|
// for taosx
|
||||||
|
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the case where re-balance occurs.
|
|
||||||
// for taosx
|
|
||||||
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
|
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
|
||||||
|
|
|
@ -458,7 +458,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
walApplyVer(pVnode->pWal, version);
|
walApplyVer(pVnode->pWal, version);
|
||||||
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
|
|
||||||
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ typedef struct {
|
||||||
STqOffsetVal currentOffset; // for tmq
|
STqOffsetVal currentOffset; // for tmq
|
||||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
SPackedData submit; // todo remove it
|
// SPackedData submit; // todo remove it
|
||||||
SSchemaWrapper* schema;
|
SSchemaWrapper* schema;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
||||||
int8_t recoverStep;
|
int8_t recoverStep;
|
||||||
|
|
|
@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
|
||||||
|
// if offset version is small than first version , let's seek to first version
|
||||||
|
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
|
||||||
|
if (pOffset->version + 1 < firstVer){
|
||||||
|
pOffset->version = firstVer - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||||
|
@ -1080,15 +1088,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
|
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
|
||||||
|
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
|
// todo refactor: move away
|
||||||
tsdbReaderClose(pScanBaseInfo->dataReader);
|
tsdbReaderClose(pScanBaseInfo->dataReader);
|
||||||
pScanBaseInfo->dataReader = NULL;
|
pScanBaseInfo->dataReader = NULL;
|
||||||
|
|
||||||
// let's seek to the next version in wal file
|
verifyOffset(pInfo->tqReader->pWalReader, pOffset);
|
||||||
int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
|
|
||||||
if (pOffset->version + 1 < firstVer){
|
|
||||||
pOffset->version = firstVer - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||||
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC
|
||||||
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||||
bool createDummyCol);
|
bool createDummyCol);
|
||||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo, int32_t threshold);
|
||||||
|
|
||||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
|
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
|
||||||
SFilePage* pData = NULL;
|
SFilePage* pData = NULL;
|
||||||
|
@ -777,7 +777,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo) {
|
SGroupResInfo* pGroupResInfo, int32_t threshold) {
|
||||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||||
int32_t numOfExprs = pSup->numOfExprs;
|
int32_t numOfExprs = pSup->numOfExprs;
|
||||||
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||||
|
@ -826,6 +826,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
|
if (pBlock->info.rows >= threshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||||
|
@ -835,6 +838,34 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSDataBlock* pBlock = pbInfo->pRes;
|
||||||
|
|
||||||
|
// set output datablock version
|
||||||
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
if (!hasRemainResults(pGroupResInfo)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear the existed group id
|
||||||
|
pBlock->info.id.groupId = 0;
|
||||||
|
ASSERT(!pbInfo->mergeResultBlock);
|
||||||
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
|
||||||
|
|
||||||
|
void* tbname = NULL;
|
||||||
|
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
|
||||||
|
pBlock->info.parTbName[0] = 0;
|
||||||
|
} else {
|
||||||
|
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
}
|
||||||
|
tdbFree(tbname);
|
||||||
|
}
|
||||||
|
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf) {
|
SDiskbasedBuf* pBuf) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -851,10 +882,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
// clear the existed group id
|
// clear the existed group id
|
||||||
pBlock->info.id.groupId = 0;
|
pBlock->info.id.groupId = 0;
|
||||||
if (!pbInfo->mergeResultBlock) {
|
if (!pbInfo->mergeResultBlock) {
|
||||||
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
|
||||||
} else {
|
} else {
|
||||||
while (hasRemainResults(pGroupResInfo)) {
|
while (hasRemainResults(pGroupResInfo)) {
|
||||||
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
|
||||||
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
|
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1613,6 +1613,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
qDebug("start to exec queue scan, %s", id);
|
qDebug("start to exec queue scan, %s", id);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
|
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
|
||||||
if (pInfo->tqReader->msg.msgStr == NULL) {
|
if (pInfo->tqReader->msg.msgStr == NULL) {
|
||||||
SPackedData submit = pTaskInfo->streamInfo.submit;
|
SPackedData submit = pTaskInfo->streamInfo.submit;
|
||||||
|
@ -1626,7 +1627,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
while (tqNextBlockImpl(pInfo->tqReader)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1642,6 +1643,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
pTaskInfo->streamInfo.submit = (SPackedData){0};
|
pTaskInfo->streamInfo.submit = (SPackedData){0};
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
|
@ -1659,10 +1661,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t type = tqNextBlockInWal(pInfo->tqReader);
|
int32_t type = tqNextBlockInWal(pInfo->tqReader);
|
||||||
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
|
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
|
||||||
|
@ -2074,7 +2078,7 @@ FETCH_NEXT_BLOCK:
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
while (tqNextBlockImpl(pInfo->tqReader)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2105,7 +2109,6 @@ FETCH_NEXT_BLOCK:
|
||||||
// record the scan action.
|
// record the scan action.
|
||||||
pInfo->numOfExec++;
|
pInfo->numOfExec++;
|
||||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||||
// printDataBlock(pInfo->pRes, "stream scan");
|
|
||||||
|
|
||||||
qDebug("scan rows: %" PRId64, pBlockInfo->rows);
|
qDebug("scan rows: %" PRId64, pBlockInfo->rows);
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
|
|
|
@ -263,6 +263,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
int16_t times = 0;
|
int16_t times = 0;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
|
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
|
@ -272,6 +274,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
qDebug("===stream===try agian batchSize:%d", batchSize);
|
qDebug("===stream===try agian batchSize:%d", batchSize);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,7 +319,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize);
|
qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize);
|
||||||
|
|
||||||
streamTaskExecImpl(pTask, pInput, pRes);
|
streamTaskExecImpl(pTask, pInput, pRes);
|
||||||
|
|
||||||
|
|
|
@ -239,6 +239,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
}
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
|
|
|
@ -122,7 +122,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist")
|
//TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line")
|
||||||
|
@ -203,6 +203,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow")
|
||||||
|
|
||||||
// mnode-func
|
// mnode-func
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name")
|
||||||
|
|
|
@ -553,6 +553,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sysinfo.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sysinfo.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_manage.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_manage.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py
|
||||||
#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
||||||
|
|
|
@ -58,16 +58,16 @@ if $data23 != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ========== stop dnode2
|
#print ========== stop dnode2
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
|
#system sh/exec.sh -n dnode2 -s stop -x SIGKILL
|
||||||
|
|
||||||
sleep 1000
|
#sleep 1000
|
||||||
print =============== drop database
|
#print =============== drop database
|
||||||
sql_error drop database d1
|
sql drop database d1
|
||||||
|
|
||||||
print ========== start dnode2
|
#print ========== start dnode2
|
||||||
system sh/exec.sh -n dnode2 -s start
|
#system sh/exec.sh -n dnode2 -s start
|
||||||
sleep 1000
|
#sleep 1000
|
||||||
|
|
||||||
print =============== re-create database
|
print =============== re-create database
|
||||||
$x = 0
|
$x = 0
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import taos
|
||||||
|
from taos.tmq import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.common import *
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
self.setsql = TDSetSql()
|
||||||
|
self.stbname = 'stb'
|
||||||
|
self.binary_length = 20 # the length of binary for column_dict
|
||||||
|
self.nchar_length = 20 # the length of nchar for column_dict
|
||||||
|
self.column_dict = {
|
||||||
|
'ts': 'timestamp',
|
||||||
|
'col1': 'float',
|
||||||
|
'col2': 'int',
|
||||||
|
'col3': 'float',
|
||||||
|
}
|
||||||
|
|
||||||
|
self.tag_dict = {
|
||||||
|
't1': 'int',
|
||||||
|
't2': f'binary({self.binary_length})'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.tag_list = [
|
||||||
|
f'1, "Beijing"',
|
||||||
|
f'2, "Shanghai"',
|
||||||
|
f'3, "Guangzhou"',
|
||||||
|
f'4, "Shenzhen"'
|
||||||
|
]
|
||||||
|
|
||||||
|
self.values_list = [
|
||||||
|
f'now, 9.1, 200, 0.3'
|
||||||
|
]
|
||||||
|
|
||||||
|
self.tbnum = 4
|
||||||
|
|
||||||
|
def create_user(self):
|
||||||
|
user_name = 'test'
|
||||||
|
tdSql.execute(f'create user {user_name} pass "test"')
|
||||||
|
tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}')
|
||||||
|
|
||||||
|
def prepare_data(self):
|
||||||
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||||
|
for j in self.values_list:
|
||||||
|
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||||
|
|
||||||
|
def user_privilege_check(self):
|
||||||
|
testconn = taos.connect(user='test', password='test')
|
||||||
|
expectErrNotOccured = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
sql = "select count(*) from db.stb where t2 = 'Beijing'"
|
||||||
|
res = testconn.query(sql)
|
||||||
|
data = res.fetch_all()
|
||||||
|
count = data[0][0]
|
||||||
|
except BaseException:
|
||||||
|
expectErrNotOccured = True
|
||||||
|
|
||||||
|
if expectErrNotOccured:
|
||||||
|
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
|
||||||
|
elif count != 1:
|
||||||
|
tdLog.exit(f"{sql}, expect result doesn't match")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def user_privilege_error_check(self):
|
||||||
|
testconn = taos.connect(user='test', password='test')
|
||||||
|
expectErrNotOccured = False
|
||||||
|
|
||||||
|
sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"]
|
||||||
|
|
||||||
|
for sql in sql_list:
|
||||||
|
try:
|
||||||
|
res = testconn.execute(sql)
|
||||||
|
except BaseException:
|
||||||
|
expectErrNotOccured = True
|
||||||
|
|
||||||
|
if expectErrNotOccured:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
self.prepare_data()
|
||||||
|
self.create_user()
|
||||||
|
self.user_privilege_check()
|
||||||
|
self.user_privilege_error_check()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -34,6 +34,9 @@ class TDTestCase:
|
||||||
if ret != 0:
|
if ret != 0:
|
||||||
tdLog.info("sml_test ret != 0")
|
tdLog.info("sml_test ret != 0")
|
||||||
|
|
||||||
|
tdSql.query(f"select * from ts3303.stb2")
|
||||||
|
tdSql.query(f"select * from ts3303.meters")
|
||||||
|
|
||||||
# tdSql.execute('use sml_db')
|
# tdSql.execute('use sml_db')
|
||||||
tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211")
|
tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
|
@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb2.py
|
python3 ./test.py -f 7-tmq/subscribeDb2.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb3.py
|
python3 ./test.py -f 7-tmq/subscribeDb3.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb4.py
|
python3 ./test.py -f 7-tmq/subscribeDb4.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb.py
|
#python3 ./test.py -f 7-tmq/subscribeStb.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb2.py
|
python3 ./test.py -f 7-tmq/subscribeStb2.py
|
||||||
|
|
|
@ -554,7 +554,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
|
||||||
if (tsEnableScience) {
|
if (tsEnableScience) {
|
||||||
printf("%*e", width, GET_FLOAT_VAL(val));
|
printf("%*e", width, GET_FLOAT_VAL(val));
|
||||||
} else {
|
} else {
|
||||||
printf("%*.5f", width, GET_FLOAT_VAL(val));
|
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val));
|
||||||
|
if (n > TMAX(20, width)) {
|
||||||
|
printf("%*e", width, GET_FLOAT_VAL(val));
|
||||||
|
} else {
|
||||||
|
printf("%s", buf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
|
|
@ -1159,6 +1159,57 @@ int sml_td23881_Test() {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int sml_ts3303_Test() {
|
||||||
|
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
|
||||||
|
TAOS_RES *pRes = taos_query(taos, "drop database if exists ts3303");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "create database if not exists ts3303");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
const char *sql[] = {
|
||||||
|
"stb2,t1=1,dataModelName=t0 f1=283i32 1632299372000",
|
||||||
|
"stb2,t1=1,dataModelName=t0 f1=106i32 1632299378000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=144i32 1629716944000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=125i32 1629717012000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=144i32 1629717012000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=107i32 1629717013000",
|
||||||
|
"stb2,t1=6,dataModelName=t0 f1=154i32 1629717140000",
|
||||||
|
"stb2,t1=6,dataModelName=t0 f1=93i32 1629717140000",
|
||||||
|
"stb2,t1=6,dataModelName=t0 f1=134i32 1629717140000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=73i32 1629717140000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=83i32 1629717140000",
|
||||||
|
"stb2,t1=4,dataModelName=t0 f1=72i32 1629717140000",
|
||||||
|
};
|
||||||
|
|
||||||
|
const char *sql1[] = {
|
||||||
|
"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000",
|
||||||
|
"meters,groupid=2,location=California.LosAngeles current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000",
|
||||||
|
};
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "use ts3303");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_schemaless_insert_ttl(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
|
||||||
|
TSDB_SML_TIMESTAMP_MILLI_SECONDS, 20);
|
||||||
|
|
||||||
|
int code = taos_errno(pRes);
|
||||||
|
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
pRes = taos_schemaless_insert_ttl(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
|
||||||
|
TSDB_SML_TIMESTAMP_NANO_SECONDS, 20);
|
||||||
|
|
||||||
|
printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int sml_ttl_Test() {
|
int sml_ttl_Test() {
|
||||||
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
|
||||||
|
@ -1336,6 +1387,9 @@ int main(int argc, char *argv[]) {
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
|
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
|
ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file
|
||||||
|
ASSERT(!ret);
|
||||||
|
|
||||||
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
|
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
|
||||||
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
|
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
|
||||||
// }
|
// }
|
||||||
|
|
Loading…
Reference in New Issue