Merge pull request #1771 from taosdata/hotfix/fix-issues-scanned-by-tsc
Hotfix/fix issues scanned by tsc
This commit is contained in:
commit
0cce9bea01
|
@ -140,7 +140,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
|
||||||
// offset of cmd in SSqlObj structure
|
// offset of cmd in SSqlObj structure
|
||||||
char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd);
|
char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd);
|
||||||
|
|
||||||
if (pMemBuffer == NULL || pDesc->pColumnModel == NULL) {
|
if (pMemBuffer == NULL) {
|
||||||
|
tscError("%p pMemBuffer", pMemBuffer);
|
||||||
|
pRes->code = TSDB_CODE_APP_ERROR;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDesc->pColumnModel == NULL) {
|
||||||
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
|
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
|
||||||
|
|
||||||
tscError("%p no local buffer or intermediate result format model", pSqlObjAddr);
|
tscError("%p no local buffer or intermediate result format model", pSqlObjAddr);
|
||||||
|
|
|
@ -231,7 +231,11 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
|
|
||||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL) {
|
||||||
|
tscError("%p sql is already released", pSql->signature);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (pSql->signature != pSql) {
|
||||||
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -313,7 +317,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
pRes->rspType = rpcMsg->msgType;
|
pRes->rspType = rpcMsg->msgType;
|
||||||
pRes->rspLen = rpcMsg->contLen;
|
pRes->rspLen = rpcMsg->contLen;
|
||||||
|
|
||||||
if (pRes->rspLen > 0) {
|
if (pRes->rspLen > 0 && rpcMsg->pCont) {
|
||||||
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -172,17 +172,17 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) {
|
||||||
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
|
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
|
||||||
SSqlStream * pStream = (SSqlStream *)param;
|
SSqlStream * pStream = (SSqlStream *)param;
|
||||||
SSqlObj * pSql = (SSqlObj *)res;
|
SSqlObj * pSql = (SSqlObj *)res;
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
||||||
|
|
||||||
if (pSql == NULL || numOfRows < 0) {
|
if (pSql == NULL || numOfRows < 0) {
|
||||||
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
||||||
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
||||||
tscClearTableMetaInfo(pTableMetaInfo, true);
|
|
||||||
|
|
||||||
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||||
|
|
||||||
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
|
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
|
||||||
pStream->numOfRes += numOfRows;
|
pStream->numOfRes += numOfRows;
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
|
|
|
@ -757,7 +757,9 @@ void tscCloseTscObj(STscObj* pObj) {
|
||||||
taosTmrStopA(&(pObj->pTimer));
|
taosTmrStopA(&(pObj->pTimer));
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
|
|
||||||
sem_destroy(&pSql->rspSem);
|
if (pSql) {
|
||||||
|
sem_destroy(&pSql->rspSem);
|
||||||
|
}
|
||||||
rpcClose(pObj->pMgmtConn);
|
rpcClose(pObj->pMgmtConn);
|
||||||
|
|
||||||
pthread_mutex_destroy(&pObj->mutex);
|
pthread_mutex_destroy(&pObj->mutex);
|
||||||
|
|
|
@ -213,6 +213,7 @@ static void dnodeCheckDataDirOpenned(char *dir) {
|
||||||
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
|
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
|
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
|
||||||
|
close(fd);
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,6 +142,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
|
||||||
|
|
||||||
if (wordexp(fptr, &full_path, 0) != 0) {
|
if (wordexp(fptr, &full_path, 0) != 0) {
|
||||||
fprintf(stderr, "ERROR: illegal file name\n");
|
fprintf(stderr, "ERROR: illegal file name\n");
|
||||||
|
free(cmd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,13 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
if (arg) arguments->password = arg;
|
if (arg) arguments->password = arg;
|
||||||
break;
|
break;
|
||||||
case 'P':
|
case 'P':
|
||||||
tsMnodeShellPort = atoi(arg);
|
if (arg) {
|
||||||
|
tsMnodeShellPort = atoi(arg);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Invalid port\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case 't':
|
case 't':
|
||||||
arguments->timezone = arg;
|
arguments->timezone = arg;
|
||||||
|
@ -101,7 +107,12 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
wordfree(&full_path);
|
wordfree(&full_path);
|
||||||
break;
|
break;
|
||||||
case 'T':
|
case 'T':
|
||||||
arguments->threadNum = atoi(arg);
|
if (arg) {
|
||||||
|
arguments->threadNum = atoi(arg);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Invalid number of threads\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case 'd':
|
case 'd':
|
||||||
arguments->database = arg;
|
arguments->database = arg;
|
||||||
|
|
|
@ -340,6 +340,9 @@ int main(int argc, char *argv[]) {
|
||||||
int count_data_type = 0;
|
int count_data_type = 0;
|
||||||
char dataString[512];
|
char dataString[512];
|
||||||
bool do_aggreFunc = true;
|
bool do_aggreFunc = true;
|
||||||
|
|
||||||
|
memset(dataString, 0, 512);
|
||||||
|
|
||||||
if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) {
|
if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) {
|
||||||
do_aggreFunc = false;
|
do_aggreFunc = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,14 +383,13 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
|
||||||
|
|
||||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||||
|
|
||||||
while ((row = taos_fetch_row(result)) != NULL) {
|
if ((row = taos_fetch_row(result)) != NULL) {
|
||||||
isSet = true;
|
isSet = true;
|
||||||
pTableRecordInfo->isMetric = false;
|
pTableRecordInfo->isMetric = false;
|
||||||
strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
|
strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
|
||||||
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
||||||
strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
|
strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
|
||||||
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
|
@ -410,11 +409,10 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((row = taos_fetch_row(result)) != NULL) {
|
if ((row = taos_fetch_row(result)) != NULL) {
|
||||||
isSet = true;
|
isSet = true;
|
||||||
pTableRecordInfo->isMetric = true;
|
pTableRecordInfo->isMetric = true;
|
||||||
strcpy(pTableRecordInfo->tableRecord.metric, table);
|
strcpy(pTableRecordInfo->tableRecord.metric, table);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
|
|
|
@ -149,7 +149,9 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
|
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg == NULL || rpcMsg->pCont == NULL) {
|
assert(rpcMsg);
|
||||||
|
|
||||||
|
if (rpcMsg->pCont == NULL) {
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,7 +158,11 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtDecVgroupRef(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
mTrace("vgId:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes);
|
|
||||||
|
mTrace("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||||
|
if (pDb) {
|
||||||
|
mTrace("tables:%d", pDb->cfg.maxTables);
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -675,7 +675,7 @@ void SQLInfoDestroy(SSqlInfo *pInfo) {
|
||||||
free(pInfo->pDCLInfo->a);
|
free(pInfo->pDCLInfo->a);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->type == TSDB_SQL_CREATE_DB) {
|
if (pInfo->pDCLInfo != NULL && pInfo->type == TSDB_SQL_CREATE_DB) {
|
||||||
tVariantListDestroy(pInfo->pDCLInfo->dbOpt.keep);
|
tVariantListDestroy(pInfo->pDCLInfo->dbOpt.keep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -899,4 +899,4 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
|
||||||
pDBInfo->keep = NULL;
|
pDBInfo->keep = NULL;
|
||||||
|
|
||||||
memset(&pDBInfo->precision, 0, sizeof(SSQLToken));
|
memset(&pDBInfo->precision, 0, sizeof(SSQLToken));
|
||||||
}
|
}
|
||||||
|
|
|
@ -636,12 +636,16 @@ void tsBufResetPos(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
||||||
STSElem elem1 = {.vnode = -1};
|
STSElem elem1 = {.vnode = -1};
|
||||||
STSCursor* pCur = &pTSBuf->cur;
|
|
||||||
|
|
||||||
if (pTSBuf == NULL || pCur->vnodeIndex < 0) {
|
if (pTSBuf == NULL) {
|
||||||
return elem1;
|
return elem1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STSCursor* pCur = &pTSBuf->cur;
|
||||||
|
if (pCur != NULL && pCur->vnodeIndex < 0) {
|
||||||
|
return elem1;
|
||||||
|
}
|
||||||
|
|
||||||
STSBlock* pBlock = &pTSBuf->block;
|
STSBlock* pBlock = &pTSBuf->block;
|
||||||
|
|
||||||
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
|
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
|
||||||
|
@ -920,4 +924,4 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
pTSBuf->fileSize += getDataStartOffset();
|
pTSBuf->fileSize += getDataStartOffset();
|
||||||
return pTSBuf;
|
return pTSBuf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,10 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
|
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
|
if (pWindowResInfo == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (pWindowResInfo->capacity == 0) {
|
||||||
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
|
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,7 +216,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iter == NULL) {
|
if (pCheckInfo->iter == NULL && pTable->mem) {
|
||||||
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
|
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
|
||||||
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
|
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue