fix several bugs related to subscribe
including: * server side crash * client side crash * server side memory leak
This commit is contained in:
parent
0836c2f747
commit
1d7626df1c
|
@ -53,12 +53,18 @@ typedef struct SParsedDataColInfo {
|
||||||
bool hasVal[TSDB_MAX_COLUMNS];
|
bool hasVal[TSDB_MAX_COLUMNS];
|
||||||
} SParsedDataColInfo;
|
} SParsedDataColInfo;
|
||||||
|
|
||||||
|
#pragma pack(push,1)
|
||||||
|
// this struct is transfered as binary, padding two bytes to avoid
|
||||||
|
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
|
||||||
|
// and set 'pack' to 1 to avoid break existing code.
|
||||||
typedef struct STidTags {
|
typedef struct STidTags {
|
||||||
|
int16_t padding;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int32_t tid;
|
int32_t tid;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
char tag[];
|
char tag[];
|
||||||
} STidTags;
|
} STidTags;
|
||||||
|
#pragma pack(pop)
|
||||||
|
|
||||||
typedef struct SJoinSupporter {
|
typedef struct SJoinSupporter {
|
||||||
SSubqueryState* pState;
|
SSubqueryState* pState;
|
||||||
|
|
|
@ -912,7 +912,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
|
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
|
||||||
|
|
||||||
pQueryMsg->head.contLen = htonl(msgLen);
|
pQueryMsg->head.contLen = htonl(msgLen);
|
||||||
assert(msgLen + minMsgSize() <= size);
|
assert(msgLen + minMsgSize() <= pCmd->allocSize);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -410,6 +410,9 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo);
|
||||||
|
size += sizeof(SQueryTableMsg) + 4096;
|
||||||
|
tscAllocPayload(&pSql->cmd, size);
|
||||||
for (int retry = 0; retry < 3; retry++) {
|
for (int retry = 0; retry < 3; retry++) {
|
||||||
tscRemoveFromSqlList(pSql);
|
tscRemoveFromSqlList(pSql);
|
||||||
|
|
||||||
|
|
|
@ -6549,12 +6549,15 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
int32_t i = pQInfo->tableIndex++;
|
int32_t i = pQInfo->tableIndex++;
|
||||||
STableQueryInfo *item = taosArrayGetP(pa, i);
|
STableQueryInfo *item = taosArrayGetP(pa, i);
|
||||||
|
|
||||||
char *output = pQuery->sdata[0]->data + i * rsize;
|
char *output = pQuery->sdata[0]->data + count * rsize;
|
||||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
||||||
|
|
||||||
output = varDataVal(output);
|
output = varDataVal(output);
|
||||||
STableId* id = TSDB_TABLEID(item->pTable);
|
STableId* id = TSDB_TABLEID(item->pTable);
|
||||||
|
|
||||||
|
*(int16_t *)output = 0;
|
||||||
|
output += sizeof(int16_t);
|
||||||
|
|
||||||
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
||||||
output += sizeof(id->uid);
|
output += sizeof(id->uid);
|
||||||
|
|
||||||
|
|
|
@ -2442,11 +2442,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
||||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||||
destroyTableMemIterator(pTableCheckInfo);
|
destroyTableMemIterator(pTableCheckInfo);
|
||||||
|
|
||||||
if (pTableCheckInfo->pDataCols != NULL) {
|
tdFreeDataCols(pTableCheckInfo->pDataCols);
|
||||||
taosTFree(pTableCheckInfo->pDataCols->buf);
|
pTableCheckInfo->pDataCols = NULL;
|
||||||
}
|
|
||||||
|
|
||||||
taosTFree(pTableCheckInfo->pDataCols);
|
|
||||||
taosTFree(pTableCheckInfo->pCompInfo);
|
taosTFree(pTableCheckInfo->pCompInfo);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
|
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
|
||||||
|
|
Loading…
Reference in New Issue