VYPR
Critical severity9.8NVD Advisory· Published Mar 31, 2026· Updated Apr 10, 2026

CVE-2026-34400

CVE-2026-34400

Description

Alerta is a monitoring tool. Prior to version 9.1.0, the Query string search API (q=) was vulnerable to SQL injection via the Postgres query parser, which built WHERE clauses by interpolating user-supplied search terms directly into SQL strings via f-strings. This issue has been patched in version 9.1.0.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
alerta-serverPyPI
< 9.1.09.1.0

Affected products

1

Patches

2
fdd52cd1abad

fix(security): parameterize Postgres query parser to prevent SQL injection (#2040)

https://github.com/alerta/alertaNick SatterlyMar 27, 2026via ghsa
3 files changed · +247 87
  • alerta/database/backends/postgres/queryparser.py+106 1 modified
    @@ -5,6 +5,11 @@
     ParserElement.enablePackrat()
     
     
    +def _next_param(params):
    +    name = f'_qp_{len(params)}'
    +    return name
    +
    +
     class UnaryOperation:
         """takes one operand,e.g. not"""
     
    @@ -26,12 +31,18 @@ class SearchModifier(UnaryOperation):
         def __repr__(self):
             return f'{self.op} {self.operands}'
     
    +    def to_sql(self, params):
    +        return f'{self.op} {self.operands.to_sql(params)}'
    +
     
     class SearchAnd(BinaryOperation):
     
         def __repr__(self):
             return f'({self.lhs} AND {self.rhs})'
     
    +    def to_sql(self, params):
    +        return f'({self.lhs.to_sql(params)} AND {self.rhs.to_sql(params)})'
    +
     
     class SearchOr(BinaryOperation):
     
    @@ -40,12 +51,20 @@ def __repr__(self):
                 return f'({self.lhs} AND {self.rhs})'
             return f'({self.lhs} OR {self.rhs})'
     
    +    def to_sql(self, params):
    +        if getattr(self.rhs, 'op', None) == 'NOT':
    +            return f'({self.lhs.to_sql(params)} AND {self.rhs.to_sql(params)})'
    +        return f'({self.lhs.to_sql(params)} OR {self.rhs.to_sql(params)})'
    +
     
     class SearchNot(UnaryOperation):
     
         def __repr__(self):
             return f'NOT ({self.operands})'
     
    +    def to_sql(self, params):
    +        return f'NOT ({self.operands.to_sql(params)})'
    +
     
     class SearchTerm:
     
    @@ -113,6 +132,89 @@ def __repr__(self):
     
             raise ParseException(f'Search term did not match query syntax: {self.tokens}')
     
    +    def to_sql(self, params):
    +        if 'singleterm' in self.tokens:
    +            if self.tokens.fieldname == '_exists_':
    +                p = _next_param(params)
    +                params[p] = self.tokens.singleterm
    +                return f'"attributes"::jsonb ? %({p})s'
    +            elif self.tokens.fieldname in ['correlate', 'service', 'tags']:
    +                p = _next_param(params)
    +                params[p] = self.tokens.singleterm
    +                return f'%({p})s=ANY("{self.tokens.field[0]}")'
    +            elif self.tokens.attr:
    +                tokens_attr = self.tokens.attr.replace('_', 'attributes')
    +                p_key = _next_param(params)
    +                params[p_key] = self.tokens.fieldname
    +                p_val = _next_param(params)
    +                params[p_val] = f'%{self.tokens.singleterm}%'
    +                return f'"{tokens_attr}"::jsonb ->>%({p_key})s ILIKE %({p_val})s'
    +            else:
    +                p = _next_param(params)
    +                params[p] = f'%{self.tokens.singleterm}%'
    +                return f'"{self.tokens.field[0]}" ILIKE %({p})s'
    +        if 'phrase' in self.tokens:
    +            if self.tokens.field[0] == '__default_field__':
    +                p = _next_param(params)
    +                params[p] = f'\\y{self.tokens.phrase}\\y'
    +                return f'"__default_field__" ~* %({p})s'
    +            elif self.tokens.field[0] in ['correlate', 'service', 'tags']:
    +                p = _next_param(params)
    +                params[p] = self.tokens.phrase
    +                return f'%({p})s=ANY("{self.tokens.field[0]}")'
    +            elif self.tokens.attr:
    +                tokens_attr = self.tokens.attr.replace('_', 'attributes')
    +                p_key = _next_param(params)
    +                params[p_key] = self.tokens.fieldname
    +                p_val = _next_param(params)
    +                params[p_val] = f'\\y{self.tokens.phrase}\\y'
    +                return f'"{tokens_attr}"::jsonb ->>%({p_key})s ~* %({p_val})s'
    +            else:
    +                p = _next_param(params)
    +                params[p] = f'\\y{self.tokens.phrase}\\y'
    +                return f'"{self.tokens.field[0]}" ~* %({p})s'
    +        if 'wildcard' in self.tokens:
    +            p = _next_param(params)
    +            params[p] = f'\\y{self.tokens.wildcard}\\y'
    +            return f'"{self.tokens.field[0]}" ~* %({p})s'
    +        if 'regex' in self.tokens:
    +            p = _next_param(params)
    +            params[p] = self.tokens.regex
    +            return f'"{self.tokens.field[0]}" ~* %({p})s'
    +        if 'range' in self.tokens:
    +            if self.tokens.range[0].lowerbound == '*':
    +                lower_term = '1=1'
    +            else:
    +                p = _next_param(params)
    +                params[p] = self.tokens.range[0].lowerbound
    +                op = '>=' if 'inclusive' in self.tokens.range[0] else '>'
    +                lower_term = f'"{self.tokens.field[0]}" {op} %({p})s'
    +
    +            if self.tokens.range[2].upperbound == '*':
    +                upper_term = '1=1'
    +            else:
    +                p = _next_param(params)
    +                params[p] = self.tokens.range[2].upperbound
    +                op = '<=' if 'inclusive' in self.tokens.range[2] else '<'
    +                upper_term = f'"{self.tokens.field[0]}" {op} %({p})s'
    +            return f'({lower_term} AND {upper_term})'
    +        if 'onesidedrange' in self.tokens:
    +            p = _next_param(params)
    +            params[p] = self.tokens.onesidedrange.bound
    +            return f'("{self.tokens.field[0]}" {self.tokens.onesidedrange.op} %({p})s)'
    +        if 'subquery' in self.tokens:
    +            subquery_sql = self.tokens.subquery[0].to_sql(params)
    +            if self.tokens.attr:
    +                tokens_attr = 'attributes' if self.tokens.attr == '_' else self.tokens.attr
    +                p = _next_param(params)
    +                params[p] = self.tokens.fieldname
    +                tokens_fieldname = f'"{tokens_attr}"::jsonb ->>%({p})s'
    +            else:
    +                tokens_fieldname = f'"{self.tokens.fieldname or self.tokens.field[0]}"'
    +            return subquery_sql.replace('"__default_field__"', tokens_fieldname)
    +
    +        raise ParseException(f'Search term did not match query syntax: {self.tokens}')
    +
     
     # BNF for Lucene query syntax
     #
    @@ -181,4 +283,7 @@ class QueryParser:
     
         def parse(self, query, default_field=None):
             default_field = default_field or QueryParser.DEFAULT_FIELD
    -        return repr(query_expr.parseString(query)[0]).replace('__default_field__', default_field)
    +        params = {}
    +        parsed = query_expr.parseString(query)[0]
    +        sql = parsed.to_sql(params)
    +        return sql.replace('__default_field__', default_field), params
    
  • alerta/database/backends/postgres/utils.py+8 6 modified
    @@ -145,11 +145,12 @@ def from_params(params: MultiDict, customers=None, query_time=None):
             if params.get('q', None):
                 try:
                     parser = QueryParser()
    -                query = [parser.parse(
    +                parsed_query, parsed_vars = parser.parse(
                         query=params['q'],
                         default_field=params.get('q.df')
    -                )]
    -                qvars = dict()  # type: Dict[str, Any]
    +                )
    +                query = [parsed_query]
    +                qvars = dict(parsed_vars)  # type: Dict[str, Any]
                 except ParseException as e:
                     raise ApiError('Failed to parse query string.', 400, [e])
             else:
    @@ -230,11 +231,12 @@ def from_params(params: MultiDict, customers=None, query_time=None):
             if params.get('q', None):
                 try:
                     parser = QueryParser()
    -                query = [parser.parse(
    +                parsed_query, parsed_vars = parser.parse(
                         query=params['q'],
                         default_field=params.get('q.df')
    -                )]
    -                qvars = dict()  # type: Dict[str, Any]
    +                )
    +                query = [parsed_query]
    +                qvars = dict(parsed_vars)  # type: Dict[str, Any]
                 except ParseException as e:
                     raise ApiError('Failed to parse query string.', 400, [e])
             else:
    
  • tests/test_queryparser.py+133 80 modified
    @@ -23,103 +23,121 @@ def test_word_and_phrase_terms(self):
     
             # default field (ie. "text") contains word
             string = r'''quick'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"text" ILIKE \'%%quick%%\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"text" ILIKE %(_qp_0)s')
    +        self.assertEqual(params, {'_qp_0': '%quick%'})
     
             # default field (ie. "text") contains either words
             string = r'''quick OR brown'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ILIKE \'%%quick%%\' OR "text" ILIKE \'%%brown%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ILIKE %(_qp_0)s OR "text" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%quick%', '_qp_1': '%brown%'})
     
             # default field (ie. "text") contains either words (default operator)
             string = r'''quick brown'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ILIKE \'%%quick%%\' OR "text" ILIKE \'%%brown%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ILIKE %(_qp_0)s OR "text" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%quick%', '_qp_1': '%brown%'})
     
             # default field (ie. "text") contains exact phrase
             string = r'''"quick brown"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"text" ~* \'\\yquick brown\\y\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"text" ~* %(_qp_0)s')
    +        self.assertEqual(params, {'_qp_0': '\\yquick brown\\y'})
     
         def test_field_names(self):
     
             # field contains word
             string = r'''status:active'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"status" ILIKE \'%%active%%\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"status" ILIKE %(_qp_0)s')
    +        self.assertEqual(params, {'_qp_0': '%active%'})
     
             # field contains either words
             string = r'''title:(quick OR brown)'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("title" ILIKE \'%%quick%%\' OR "title" ILIKE \'%%brown%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("title" ILIKE %(_qp_0)s OR "title" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%quick%', '_qp_1': '%brown%'})
     
             # field contains either words (default operator)
             string = r'''title:(quick brown)'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("title" ILIKE \'%%quick%%\' OR "title" ILIKE \'%%brown%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("title" ILIKE %(_qp_0)s OR "title" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%quick%', '_qp_1': '%brown%'})
     
             # field contains exact phrase
             string = r'''author:"John Smith"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"author" ~* \'\\yJohn Smith\\y\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"author" ~* %(_qp_0)s')
    +        self.assertEqual(params, {'_qp_0': '\\yJohn Smith\\y'})
     
             # # any attribute contains word or phrase
             # string = r'''attributes.\*:(quick brown)'''
    -        # r = self.parser.parse(string)
    -        # self.assertEqual(r, '??')
    +        # sql, params = self.parser.parse(string)
    +        # self.assertEqual(sql, '??')
     
             # attribute field has non-null value
             string = r'''_exists_:title'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"attributes"::jsonb ? \'title\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"attributes"::jsonb ? %(_qp_0)s')
    +        self.assertEqual(params, {'_qp_0': 'title'})
     
             # attribute contains word
             string = r'''foo.vendor:cisco'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"foo"::jsonb ->>\'vendor\' ILIKE \'%%cisco%%\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"foo"::jsonb ->>%(_qp_0)s ILIKE %(_qp_1)s')
    +        self.assertEqual(params, {'_qp_0': 'vendor', '_qp_1': '%cisco%'})
     
             # attribute contains word ("_" shortcut)
             string = r'''_.vendor:cisco'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"attributes"::jsonb ->>\'vendor\' ILIKE \'%%cisco%%\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"attributes"::jsonb ->>%(_qp_0)s ILIKE %(_qp_1)s')
    +        self.assertEqual(params, {'_qp_0': 'vendor', '_qp_1': '%cisco%'})
     
             # attribute contains either words
             string = r'''attributes.vendor:(cisco OR juniper)'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("attributes"::jsonb ->>\'vendor\' ILIKE \'%%cisco%%\' OR "attributes"::jsonb ->>\'vendor\' ILIKE \'%%juniper%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("attributes"::jsonb ->>%(_qp_2)s ILIKE %(_qp_0)s OR "attributes"::jsonb ->>%(_qp_2)s ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%cisco%', '_qp_1': '%juniper%', '_qp_2': 'vendor'})
     
             # attribute contains either words (default operator)
             string = r'''attributes.vendor:(cisco juniper)'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("attributes"::jsonb ->>\'vendor\' ILIKE \'%%cisco%%\' OR "attributes"::jsonb ->>\'vendor\' ILIKE \'%%juniper%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("attributes"::jsonb ->>%(_qp_2)s ILIKE %(_qp_0)s OR "attributes"::jsonb ->>%(_qp_2)s ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%cisco%', '_qp_1': '%juniper%', '_qp_2': 'vendor'})
     
             # attribute contains either words ("_" shortcut, default operator)
             string = r'''_.vendor:(cisco juniper)'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("attributes"::jsonb ->>\'vendor\' ILIKE \'%%cisco%%\' OR "attributes"::jsonb ->>\'vendor\' ILIKE \'%%juniper%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("attributes"::jsonb ->>%(_qp_2)s ILIKE %(_qp_0)s OR "attributes"::jsonb ->>%(_qp_2)s ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '%cisco%', '_qp_1': '%juniper%', '_qp_2': 'vendor'})
     
             # attribute contains exact phrase
             string = r'''foo.vendor:"quick brown"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"foo"::jsonb ->>\'vendor\' ~* \'\\yquick brown\\y\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"foo"::jsonb ->>%(_qp_0)s ~* %(_qp_1)s')
    +        self.assertEqual(params, {'_qp_0': 'vendor', '_qp_1': '\\yquick brown\\y'})
     
             # attribute contains exact phrase ("_" shortcut)
             string = r'''_.vendor:"quick brown"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"attributes"::jsonb ->>\'vendor\' ~* \'\\yquick brown\\y\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"attributes"::jsonb ->>%(_qp_0)s ~* %(_qp_1)s')
    +        self.assertEqual(params, {'_qp_0': 'vendor', '_qp_1': '\\yquick brown\\y'})
     
         def test_wildcards(self):
     
             # ? = single character, * = one or more characters
             string = r'''text:qu?ck bro*'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yqu.?ck\\y\' OR "text" ~* \'\\ybro.*\\y\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s OR "text" ~* %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yqu.?ck\\y', '_qp_1': '\\ybro.*\\y'})
     
         def test_regular_expressions(self):
     
             string = r'''name:/joh?n(ath[oa]n)/'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '"name" ~* \'joh?n(ath[oa]n)\'')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"name" ~* %(_qp_0)s')
    +        self.assertEqual(params, {'_qp_0': 'joh?n(ath[oa]n)'})
     
         def test_fuzziness(self):
             pass
    @@ -130,46 +148,56 @@ def test_proximity_searches(self):
         def test_ranges(self):
     
             string = r'''date:[2012-01-01 TO 2012-12-31]'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("date" >= \'2012-01-01\' AND "date" <= \'2012-12-31\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("date" >= %(_qp_0)s AND "date" <= %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '2012-01-01', '_qp_1': '2012-12-31'})
     
             string = r'''count:[1 TO 5]'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("count" >= \'1\' AND "count" <= \'5\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("count" >= %(_qp_0)s AND "count" <= %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '1', '_qp_1': '5'})
     
             string = r'''tag:{alpha TO omega}'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("tag" > \'alpha\' AND "tag" < \'omega\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("tag" > %(_qp_0)s AND "tag" < %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': 'alpha', '_qp_1': 'omega'})
     
             string = r'''count:[10 TO *]'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("count" >= \'10\' AND 1=1)')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("count" >= %(_qp_0)s AND 1=1)')
    +        self.assertEqual(params, {'_qp_0': '10'})
     
             string = r'''date:{* TO 2012-01-01}'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '(1=1 AND "date" < \'2012-01-01\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '(1=1 AND "date" < %(_qp_0)s)')
    +        self.assertEqual(params, {'_qp_0': '2012-01-01'})
     
             string = r'''count:[1 TO 5}'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("count" >= \'1\' AND "count" < \'5\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("count" >= %(_qp_0)s AND "count" < %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '1', '_qp_1': '5'})
     
         def test_unbounded_ranges(self):
     
             string = r'''age:>10'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("age" > \'10\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("age" > %(_qp_0)s)')
    +        self.assertEqual(params, {'_qp_0': '10'})
     
             string = r'''age:>=10'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("age" >= \'10\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("age" >= %(_qp_0)s)')
    +        self.assertEqual(params, {'_qp_0': '10'})
     
             string = r'''age:<10'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("age" < \'10\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("age" < %(_qp_0)s)')
    +        self.assertEqual(params, {'_qp_0': '10'})
     
             string = r'''age:<=10'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("age" <= \'10\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("age" <= %(_qp_0)s)')
    +        self.assertEqual(params, {'_qp_0': '10'})
     
         def test_boosting(self):
             pass
    @@ -178,45 +206,54 @@ def test_boolean_operators(self):
     
             # OR (||)
             string = r'''"jakarta apache" jakarta'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' OR "text" ILIKE \'%%jakarta%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s OR "text" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '%jakarta%'})
     
             string = r'''"jakarta apache" OR jakarta'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' OR "text" ILIKE \'%%jakarta%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s OR "text" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '%jakarta%'})
     
             string = r'''"jakarta apache" || jakarta'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' OR "text" ILIKE \'%%jakarta%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s OR "text" ILIKE %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '%jakarta%'})
     
             # AND (&&)
             string = r'''"jakarta apache" AND "Apache Lucene"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' AND "text" ~* \'\\yApache Lucene\\y\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s AND "text" ~* %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '\\yApache Lucene\\y'})
     
             string = r'''"jakarta apache" && "Apache Lucene"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' AND "text" ~* \'\\yApache Lucene\\y\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s AND "text" ~* %(_qp_1)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '\\yApache Lucene\\y'})
     
             # + (required)
             pass
     
             # NOT (!)
             string = r'''"jakarta apache" NOT "Apache Lucene"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' AND NOT ("text" ~* \'\\yApache Lucene\\y\'))')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s AND NOT ("text" ~* %(_qp_1)s))')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '\\yApache Lucene\\y'})
     
             string = r'''"jakarta apache" !"Apache Lucene"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("text" ~* \'\\yjakarta apache\\y\' AND NOT ("text" ~* \'\\yApache Lucene\\y\'))')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("text" ~* %(_qp_0)s AND NOT ("text" ~* %(_qp_1)s))')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '\\yApache Lucene\\y'})
     
             string = r'''NOT "jakarta apache"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, 'NOT ("text" ~* \'\\yjakarta apache\\y\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, 'NOT ("text" ~* %(_qp_0)s)')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y'})
     
             string = r'''group:"jakarta apache" NOT group:"Apache Lucene"'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '("group" ~* \'\\yjakarta apache\\y\' AND NOT ("group" ~* \'\\yApache Lucene\\y\'))')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '("group" ~* %(_qp_0)s AND NOT ("group" ~* %(_qp_1)s))')
    +        self.assertEqual(params, {'_qp_0': '\\yjakarta apache\\y', '_qp_1': '\\yApache Lucene\\y'})
     
             # - (prohibit)
             pass
    @@ -225,14 +262,30 @@ def test_grouping(self):
     
             # field exact match
             string = r'''(quick OR brown) AND fox'''
    -        r = self.parser.parse(string)
    -        self.assertEqual(r, '(("text" ILIKE \'%%quick%%\' OR "text" ILIKE \'%%brown%%\') AND "text" ILIKE \'%%fox%%\')')
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '(("text" ILIKE %(_qp_0)s OR "text" ILIKE %(_qp_1)s) AND "text" ILIKE %(_qp_2)s)')
    +        self.assertEqual(params, {'_qp_0': '%quick%', '_qp_1': '%brown%', '_qp_2': '%fox%'})
     
             # field exact match
             string = r'''status:(active OR pending) title:(full text search)'''
    -        r = self.parser.parse(string)
    +        sql, params = self.parser.parse(string)
             self.assertEqual(
    -            r, '(("status" ILIKE \'%%active%%\' OR "status" ILIKE \'%%pending%%\') OR ("title" ILIKE \'%%full%%\' OR "title" ILIKE \'%%text%%\'))')
    +            sql, '(("status" ILIKE %(_qp_0)s OR "status" ILIKE %(_qp_1)s) OR ("title" ILIKE %(_qp_2)s OR "title" ILIKE %(_qp_3)s))')
    +        self.assertEqual(params, {'_qp_0': '%active%', '_qp_1': '%pending%', '_qp_2': '%full%', '_qp_3': '%text%'})
    +
    +    def test_sql_injection_prevention(self):
    +
    +        # single quotes in search terms should be safely parameterized
    +        string = r'''text:O'Brien'''
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"text" ILIKE %(_qp_0)s')
    +        self.assertIn("O'Brien", params['_qp_0'])
    +
    +        # SQL keywords in search terms should be safely parameterized
    +        string = '''"DROP TABLE alerts"'''
    +        sql, params = self.parser.parse(string)
    +        self.assertEqual(sql, '"text" ~* %(_qp_0)s')
    +        self.assertIn('DROP TABLE alerts', params['_qp_0'])
     
     
     def skip_mongodb():
    
aeba85a37a09

Replace mongo-specific query stirng syntax with database agnostic (#712)

https://github.com/alerta/alertaNick SatterlyOct 18, 2018via ghsa
13 files changed · +559 15
  • alerta/database/backends/mongodb/parser.py+85 0 added
    @@ -0,0 +1,85 @@
    +
    +
    +from pyparsing import Optional, ParseException, infixNotation, opAssoc
    +
    +from alerta.database.backends.mongodb.syntax import (and_, expression, not_,
    +                                                     or_, prohibit_modifier,
    +                                                     required_modifier, term)
    +
    +
    +class UnaryOperation:
    +    """takes one operand,e.g. not"""
    +
    +    def __init__(self, tokens):
    +        self.op, self.operands = tokens[0]
    +
    +
    +class BinaryOperation:
    +    """takes two or more operands, e.g. and, or"""
    +
    +    def __init__(self, tokens):
    +        self.op = tokens[0][1]
    +        self.operands = tokens[0][0::2]
    +
    +
    +class SearchAnd(BinaryOperation):
    +
    +    def __repr__(self):
    +        return '{{ "$and": [{0}] }}'.format(','.join(str(oper) for oper in self.operands))
    +
    +
    +class SearchOr(BinaryOperation):
    +
    +    def __repr__(self):
    +        return '{{ "$or": [{0}] }}'.format(','.join(str(oper) for oper in self.operands))
    +
    +
    +class SearchNot(UnaryOperation):
    +
    +    def __repr__(self):
    +        return '{{ "$not": {0} }}'.format(self.operands)
    +
    +
    +class SearchTerm:
    +
    +    def __init__(self, tokens):
    +        self.tokens = tokens
    +        if 'field' in self.tokens:
    +            self.term = self.tokens[2]
    +        else:
    +            self.term = self.tokens[0]
    +
    +    def __repr__(self):
    +        if 'field' in self.tokens:
    +            if 'word' in self.tokens:
    +                if self.tokens.field == '_exists_':
    +                    return '{{ "attributes.{}": {{ "$exists": true }} }}'.format(self.term)
    +                else:
    +                    return '{{ "{}": {{ "$regex": "{}" }} }}'.format(self.tokens.field, self.tokens.word)
    +            if 'string' in self.tokens:
    +                return '{{ "{}": "{}" }}'.format(self.tokens.field, self.tokens.string.strip('"'))
    +            if 'wildcard' in self.tokens:
    +                wildcard = self.tokens.wildcard.replace('?', '.?').replace('*', '.*')
    +                return '{{ "{}": {{ "$regex": "{}" }} }}'.format(self.tokens.field, wildcard)
    +            if 'regex' in self.tokens:
    +                return '{{ "{}": {{ "$regex": "{}" }} }}'.format(self.tokens.field, self.tokens.regex.strip('/'))
    +        else:
    +            if 'word' in self.tokens:
    +                return '{{ "$text": {{ "$search": "{}" }} }}'.format(self.tokens.word)
    +            if 'string' in self.tokens:
    +                return '{{ "$text": {{ "$search": "{}" }} }}'.format(self.tokens.string.replace('"', '\\"'))
    +            if 'regex' in self.tokens:
    +                return '{{ "$text": {{ "$search": "{}" }} }}'.format(self.tokens.regex.strip('/'))
    +        raise ParseException('Search term did not match query syntax: %s' % self.tokens)
    +
    +
    +term.addParseAction(SearchTerm)
    +
    +
    +expression << infixNotation(term,
    +                            [
    +                                (required_modifier | prohibit_modifier, 1, opAssoc.RIGHT),
    +                                ((not_ | '!').setParseAction(lambda: 'NOT'), 1, opAssoc.RIGHT, SearchNot),
    +                                ((and_ | '&&').setParseAction(lambda: 'AND'), 2, opAssoc.LEFT, SearchAnd),
    +                                (Optional(or_ | '||').setParseAction(lambda: 'OR'), 2, opAssoc.LEFT, SearchOr),
    +                            ])
    
  • alerta/database/backends/mongodb/syntax.py+43 0 added
    @@ -0,0 +1,43 @@
    +
    +from pyparsing import (CaselessKeyword, Combine, Forward, Group, Literal,
    +                       OneOrMore, Optional, ParserElement, QuotedString, Regex,
    +                       Suppress, White, Word, printables, pyparsing_common)
    +
    +ParserElement.enablePackrat()
    +
    +COLON, LBRACK, RBRACK, LBRACE, RBRACE, TILDE, CARAT = map(Literal, ':[]{}~^')
    +LPAR, RPAR = map(Suppress, '()')
    +and_, or_, not_, to_ = map(CaselessKeyword, 'AND OR NOT TO'.split())
    +keyword = and_ | or_ | not_ | to_
    +
    +expression = Forward()
    +
    +valid_word = Word(printables, excludeChars='?*:"').setName('word')
    +valid_word.setParseAction(
    +    lambda t: t[0].replace('\\\\', chr(127)).replace('\\', '').replace(chr(127), '\\')
    +)
    +
    +string = QuotedString('"', unquoteResults=False)
    +
    +required_modifier = Literal('+')('required')
    +prohibit_modifier = Literal('-')('prohibit')
    +integer = Regex(r'\d+').setParseAction(lambda t: int(t[0]))
    +proximity_modifier = Group(TILDE + integer('proximity'))
    +number = pyparsing_common.fnumber()
    +fuzzy_modifier = TILDE + Optional(number, default=0.5)('fuzzy')
    +
    +term = Forward()
    +field_name = valid_word().setName('fieldname')
    +incl_range_search = Group(LBRACK + term('lower') + to_ + term('upper') + RBRACK)
    +excl_range_search = Group(LBRACE + term('lower') + to_ + term('upper') + RBRACE)
    +range_search = incl_range_search('incl_range') | excl_range_search('excl_range')
    +boost = (CARAT + number('boost'))
    +
    +string_expr = Group(string + proximity_modifier) | string
    +word_expr = Group(valid_word + fuzzy_modifier) | valid_word
    +wildcard_expr = Combine(OneOrMore(Regex('[a-z0-9]*[\?\*][a-z0-9]*') | White(' ', max=1) + ~White())).setName('wildcard')
    +regular_expr = QuotedString('/', unquoteResults=False).setName('regex')
    +
    +term << (Optional(field_name('field') + COLON) +
    +         (regular_expr('regex') | wildcard_expr('wildcard') | word_expr('word') | string_expr('string') | range_search('range') | Group(LPAR + expression + RPAR)) +
    +         Optional(boost))
    
  • alerta/database/backends/mongodb/utils.py+4 3 modified
    @@ -6,6 +6,7 @@
     from flask import g
     from werkzeug.datastructures import MultiDict
     
    +from alerta.database.backends.mongodb.parser import expression
     from alerta.database.base import QueryBuilder
     from alerta.utils.format import DateTime
     
    @@ -16,19 +17,19 @@
     class QueryBuilderImpl(QueryBuilder):
     
         @staticmethod
    -    def from_params(params, query_time=None):
    +    def from_params(params: MultiDict, query_time=None):
     
             # q
             if params.get('q', None):
    -            query = json.loads(params['q'])
    +            query = json.loads(repr(expression.parseString(params.get('q'))[0]))
             else:
                 query = dict()
     
             # customers
             if g.get('customers', None):
                 customer_query = {'customer': {'$in': g.get('customers')}}
             else:
    -            customer_query = None
    +            customer_query = None  # type: ignore
     
             # from-date, to-date
             from_date = params.get('from-date', default=None, type=DateTime.parse)
    
  • alerta/database/backends/postgres/parser.py+87 0 added
    @@ -0,0 +1,87 @@
    +
    +from flask import current_app
    +from pyparsing import Optional, ParseException, infixNotation, opAssoc
    +
    +from alerta.database.backends.postgres.syntax import (and_, expression, not_,
    +                                                      or_, prohibit_modifier,
    +                                                      required_modifier, term)
    +
    +
    +class UnaryOperation:
    +    """takes one operand,e.g. not"""
    +
    +    def __init__(self, tokens):
    +        self.op, self.operands = tokens[0]
    +
    +
    +class BinaryOperation:
    +    """takes two or more operands, e.g. and, or"""
    +
    +    def __init__(self, tokens):
    +        self.op = tokens[0][1]
    +        self.lhs = tokens[0][0]
    +        self.rhs = tokens[0][2]
    +
    +
    +class SearchAnd(BinaryOperation):
    +
    +    def __repr__(self):
    +        return '{} AND {}'.format(self.lhs, self.rhs)
    +
    +
    +class SearchOr(BinaryOperation):
    +
    +    def __repr__(self):
    +        return '{} OR {}'.format(self.lhs, self.rhs)
    +
    +
    +class SearchNot(UnaryOperation):
    +
    +    def __repr__(self):
    +        return 'NOT {}'.format(self.operands)
    +
    +
    +class SearchTerm:
    +
    +    def __init__(self, tokens):
    +        self.default_field = current_app.config['DEFAULT_FIELD']
    +        self.tokens = tokens
    +        if 'field' in self.tokens:
    +            self.term = self.tokens[2]
    +        else:
    +            self.term = self.tokens[0]
    +
    +    def __repr__(self):
    +        if 'field' in self.tokens:
    +            if 'word' in self.tokens:
    +                if self.tokens.field == '_exists_':
    +                    return '"attributes"::jsonb ? \'{}\''.format(self.term)
    +                else:
    +                    return '"{}" ILIKE \'%%{}%%\''.format(self.tokens.field, self.tokens.word)
    +            if 'string' in self.tokens:
    +                return '"{}"=\'{}\''.format(self.tokens.field, self.tokens.string.strip('"'))
    +            if 'wildcard' in self.tokens:
    +                wildcard = self.tokens.wildcard.replace('?', '.?').replace('*', '.*')
    +                return '"{}" ~* \'{}\''.format(self.tokens.field, wildcard)
    +            if 'regex' in self.tokens:
    +                return '"{}" ~* \'{}\''.format(self.tokens.field, self.tokens.regex.strip('/'))
    +        else:
    +            if 'word' in self.tokens:
    +                return '"{}" ILIKE \'%%{}%%\''.format(self.default_field, self.tokens.word)
    +            if 'string' in self.tokens:
    +                return '"{}" ~* \'{}\''.format(self.default_field, self.tokens.string.strip('"'))
    +            if 'regex' in self.tokens:
    +                return '"{}" ~* \'{}\''.format(self.default_field, self.tokens.regex.strip('/'))
    +        raise ParseException('Search term did not match query syntax: %s' % self.tokens)
    +
    +
    +term.addParseAction(SearchTerm)
    +
    +
    +expression << infixNotation(term,
    +                            [
    +                                (required_modifier | prohibit_modifier, 1, opAssoc.RIGHT),
    +                                ((not_ | '!').setParseAction(lambda: 'NOT'), 1, opAssoc.RIGHT, SearchNot),
    +                                ((and_ | '&&').setParseAction(lambda: 'AND'), 2, opAssoc.LEFT, SearchAnd),
    +                                (Optional(or_ | '||').setParseAction(lambda: 'OR'), 2, opAssoc.LEFT, SearchOr),
    +                            ])
    
  • alerta/database/backends/postgres/syntax.py+43 0 added
    @@ -0,0 +1,43 @@
    +
    +from pyparsing import (CaselessKeyword, Combine, Forward, Group, Literal,
    +                       OneOrMore, Optional, ParserElement, QuotedString, Regex,
    +                       Suppress, White, Word, printables, pyparsing_common)
    +
    +ParserElement.enablePackrat()
    +
    +COLON, LBRACK, RBRACK, LBRACE, RBRACE, TILDE, CARAT = map(Literal, ':[]{}~^')
    +LPAR, RPAR = map(Suppress, '()')
    +and_, or_, not_, to_ = map(CaselessKeyword, 'AND OR NOT TO'.split())
    +keyword = and_ | or_ | not_ | to_
    +
    +expression = Forward()
    +
    +valid_word = Word(printables, excludeChars='?*:"').setName('word')
    +valid_word.setParseAction(
    +    lambda t: t[0].replace('\\\\', chr(127)).replace('\\', '').replace(chr(127), '\\')
    +)
    +
    +string = QuotedString('"', unquoteResults=False)
    +
    +required_modifier = Literal('+')('required')
    +prohibit_modifier = Literal('-')('prohibit')
    +integer = Regex(r'\d+').setParseAction(lambda t: int(t[0]))
    +proximity_modifier = Group(TILDE + integer('proximity'))
    +number = pyparsing_common.fnumber()
    +fuzzy_modifier = TILDE + Optional(number, default=0.5)('fuzzy')
    +
    +term = Forward()
    +field_name = valid_word().setName('fieldname')
    +incl_range_search = Group(LBRACK + term('lower') + to_ + term('upper') + RBRACK)
    +excl_range_search = Group(LBRACE + term('lower') + to_ + term('upper') + RBRACE)
    +range_search = incl_range_search('incl_range') | excl_range_search('excl_range')
    +boost = (CARAT + number('boost'))
    +
    +string_expr = Group(string + proximity_modifier) | string
    +word_expr = Group(valid_word + fuzzy_modifier) | valid_word
    +wildcard_expr = Combine(OneOrMore(Regex('[a-z0-9]*[\?\*][a-z0-9]*') | White(' ', max=1) + ~White())).setName('wildcard')
    +regular_expr = QuotedString('/', unquoteResults=False).setName('regex')
    +
    +term << (Optional(field_name('field') + COLON) +
    +         (regular_expr('regex') | wildcard_expr('wildcard') | word_expr('word') | string_expr('string') | range_search('range') | Group(LPAR + expression + RPAR)) +
    +         Optional(boost))
    
  • alerta/database/backends/postgres/utils.py+3 1 modified
    @@ -4,6 +4,7 @@
     from flask import g
     from werkzeug.datastructures import MultiDict
     
    +from alerta.database.backends.postgres.parser import expression
     from alerta.database.base import QueryBuilder
     from alerta.utils.format import DateTime
     
    @@ -18,7 +19,8 @@ def from_params(params, query_time=None):
     
             # q
             if params.get('q', None):
    -            raise NotImplementedError("'q' search parameter is not currently supported")
    +            query = [repr(expression.parseString(params.get('q'))[0])]
    +            qvars = dict()
             else:
                 query = ['1=1']
                 qvars = dict()
    
  • alerta/settings.py+1 0 modified
    @@ -23,6 +23,7 @@
     ALARM_MODEL = 'ALERTA'  # 'ALERTA' (default) or 'ISA_18_2'
     
     QUERY_LIMIT = 1000
    +DEFAULT_FIELD = 'text'  # default field if no search prefix specified (Postgres only)
     DEFAULT_PAGE_SIZE = QUERY_LIMIT  # maximum number of alerts returned by a single query
     BULK_QUERY_LIMIT = 100000  # max number of alerts for bulk endpoints
     HISTORY_LIMIT = 100  # cap the number of alert history entries
    
  • .isort.cfg+1 1 modified
    @@ -1,2 +1,2 @@
     [settings]
    -known_third_party = bcrypt,bson,celery,click,dateutil,flask,flask_compress,flask_cors,itsdangerous,jwt,kombu,ldap,pkg_resources,psycopg2,pymongo,pytz,raven,requests,saml2,setuptools,six,telepot,werkzeug
    +known_third_party = bcrypt,bson,celery,click,dateutil,flask,flask_compress,flask_cors,itsdangerous,jwt,kombu,ldap,pkg_resources,psycopg2,pymongo,pyparsing,pytz,raven,requests,saml2,setuptools,six,telepot,werkzeug
    
  • mypy.ini+3 0 modified
    @@ -21,6 +21,9 @@ ignore_missing_imports = True
     [mypy-psycopg2.*]
     ignore_missing_imports = True
     
    +[mypy-pyparsing.*]
    +ignore_missing_imports = True
    +
     [mypy-celery.*]
     ignore_missing_imports = True
     
    
  • setup.py+1 0 modified
    @@ -43,6 +43,7 @@ def read(filename):
             'raven[flask]>=6.2.1',
             'pymongo>=3.0',
             'psycopg2',
    +        'pyparsing',
             'requests',
             'python-dateutil',
             'pytz',
    
  • tests/test_alerts.py+6 10 modified
    @@ -640,18 +640,14 @@ def test_aggregations(self):
             data = json.loads(response.data.decode('utf-8'))
             self.assertIn('services', data)
     
    -    def test_mongo_query(self):
    +    def test_query_param(self):
             # create alert
             response = self.client.post('/alert', data=json.dumps(self.normal_alert), headers=self.headers)
             self.assertEqual(response.status_code, 201)
             data = json.loads(response.data.decode('utf-8'))
     
    -        if self.app.config['DATABASE_URL'].startswith('mongodb'):
    -            query_dict = {
    -                'q': '{"event": "node_up"}'
    -            }
    -            response = self.client.get('/alerts', query_string=query_dict)
    -            self.assertEqual(response.status_code, 200)
    -            data = json.loads(response.data.decode('utf-8'))
    -            self.assertEquals(data['total'], 1)
    -            self.assertEquals(data['alerts'][0]['event'], 'node_up')
    +        response = self.client.get('/alerts?q=event:node_up')
    +        self.assertEqual(response.status_code, 200)
    +        data = json.loads(response.data.decode('utf-8'))
    +        self.assertEquals(data['total'], 1)
    +        self.assertEquals(data['alerts'][0]['event'], 'node_up')
    
  • tests/test_mongo_query.py+135 0 added
    @@ -0,0 +1,135 @@
    +import unittest
    +
    +from alerta.database.backends.mongodb.parser import expression as mongo_search
    +
    +
    +class MongoQueryTestCase(unittest.TestCase):
    +
    +    def setUp(self):
    +        pass
    +
    +    def test_word_and_phrase_terms(self):
    +
    +        # default field (ie. "text") contains word
    +        string = r'''quick'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "$text": { "$search": "quick" } }')
    +
    +        # default field (ie. "text") contains phrase
    +        string = r'''"quick brown"'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "$text": { "$search": "\\"quick brown\\"" } }')
    +
    +    def test_field_names(self):
    +
    +        # field contains word
    +        string = r'''status:active'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "status": { "$regex": "active" } }')
    +
    +        # # field contains either words
    +        # string = r'''title:(quick OR brown)'''
    +        # r = mongo_search.parseString(string)
    +        # self.assertEqual(repr(r[0]), '{ "title": { "$or": [ { "$regex": { "quick" } }, { "$regex": { "brown" } } ] } }', repr(r[0]))
    +
    +        # # field contains either words (default operator)
    +        # string = r'''title:(quick brown)'''
    +        # r = mongo_search.parseString(string)
    +        # self.assertEqual(repr(r[0]), '{ "status": { "$regex": "active" } }')
    +
    +        # field exact match
    +        string = r'''author:"John Smith"'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "author": "John Smith" }')
    +
    +        # # any attribute contains word or phrase
    +        # string = r'''attributes.\*:(quick brown)'''
    +        # r = mongo_search.parseString(string)
    +        # self.assertEqual(repr(r[0]), '??')
    +
    +        # attribute field has non-null value
    +        string = r'''_exists_:title'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "attributes.title": { "$exists": true } }')
    +
    +    def test_wildcards(self):
    +
    +        # ? = single character, * = one or more characters
    +        string = r'''text:qu?ck bro*'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "text": { "$regex": "qu.?ck bro.*" } }')
    +
    +    def test_regular_expressions(self):
    +
    +        string = r'''name:/joh?n(ath[oa]n)/'''
    +        r = mongo_search.parseString(string)
    +        self.assertEqual(repr(r[0]), '{ "name": { "$regex": "joh?n(ath[oa]n)" } }')
    +
    +    def test_fuzziness(self):
    +        pass
    +
    +    def test_proximity_searches(self):
    +        pass
    +
    +    # def test_ranges(self):
    +    #
    +    #     string = r'''date:[2012-01-01 TO 2012-12-31]'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''count:[1 TO 5]'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''tag:{alpha TO omega}'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''count:[10 TO *]'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r''''date:{* TO 2012-01-01}'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r''''count:[1 TO 5}'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    # def test_unbounded_ranges(self):
    +    #
    +    #     string = r'''age:>10'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''age:>=10'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''age:<10'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''age:<=10'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +
    +    def test_boosting(self):
    +        pass
    +
    +    def test_boolean_operators(self):
    +        pass
    +
    +    # def test_grouping(self):
    +    #
    +    #     # field exact match
    +    #     string = r'''(quick OR brown) AND fox'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     # field exact match
    +    #     string = r'''status:(active OR pending) title:(full text search)'''
    +    #     r = mongo_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +
    
  • tests/test_postgres_query.py+147 0 added
    @@ -0,0 +1,147 @@
    +import unittest
    +
    +from alerta.app import create_app
    +from alerta.database.backends.postgres.parser import expression as postgres_search
    +
    +
    +class PostgresQueryTestCase(unittest.TestCase):
    +
    +    def setUp(self):
    +
    +        test_config = {
    +            'TESTING': True
    +        }
    +        self.app = create_app(test_config)
    +
    +    def test_word_and_phrase_terms(self):
    +
    +        with self.app.test_request_context('/'):
    +
    +            # default field (ie. "text") contains word
    +            string = r'''quick'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"text" ILIKE \'%%quick%%\'')
    +
    +            # default field (ie. "text") contains phrase
    +            string = r'''"quick brown"'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"text" ~* \'quick brown\'')
    +
    +    def test_field_names(self):
    +
    +        with self.app.test_request_context('/'):
    +
    +            # field contains word
    +            string = r'''status:active'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"status" ILIKE \'%%active%%\'')
    +
    +            # # field contains either words
    +            # string = r'''title:(quick OR brown)'''
    +            # r = postgres_search.parseString(string)
    +            # self.assertEqual(repr(r[0]), '', repr(r[0]))
    +
    +            # # field contains either words (default operator)
    +            # string = r'''title:(quick brown)'''
    +            # r = postgres_search.parseString(string)
    +            # self.assertEqual(repr(r[0]), '')
    +
    +            # field exact match
    +            string = r'''author:"John Smith"'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"author"=\'John Smith\'')
    +
    +            # # any attribute contains word or phrase
    +            # string = r'''attributes.\*:(quick brown)'''
    +            # r = postgres_search.parseString(string)
    +            # self.assertEqual(repr(r[0]), '??')
    +
    +            # attribute field has non-null value
    +            string = r'''_exists_:title'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"attributes"::jsonb ? \'title\'')
    +
    +    def test_wildcards(self):
    +
    +        with self.app.test_request_context('/'):
    +
    +            # ? = single character, * = one or more characters
    +            string = r'''text:qu?ck bro*'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"text" ~* \'qu.?ck bro.*\'')
    +
    +    def test_regular_expressions(self):
    +
    +        with self.app.test_request_context('/'):
    +
    +            string = r'''name:/joh?n(ath[oa]n)/'''
    +            r = postgres_search.parseString(string)
    +            self.assertEqual(repr(r[0]), '"name" ~* \'joh?n(ath[oa]n)\'')
    +
    +    def test_fuzziness(self):
    +        pass
    +
    +    def test_proximity_searches(self):
    +        pass
    +
    +    # def test_ranges(self):
    +    #
    +    #     string = r'''date:[2012-01-01 TO 2012-12-31]'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''count:[1 TO 5]'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''tag:{alpha TO omega}'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''count:[10 TO *]'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r''''date:{* TO 2012-01-01}'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r''''count:[1 TO 5}'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    # def test_unbounded_ranges(self):
    +    #
    +    #     string = r'''age:>10'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''age:>=10'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''age:<10'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     string = r'''age:<=10'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +
    +    def test_boosting(self):
    +        pass
    +
    +    def test_boolean_operators(self):
    +        pass
    +
    +    # def test_grouping(self):
    +    #
    +    #     # field exact match
    +    #     string = r'''(quick OR brown) AND fox'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    +    #
    +    #     # field exact match
    +    #     string = r'''status:(active OR pending) title:(full text search)'''
    +    #     r = postgres_search.parseString(string)
    +    #     self.assertEqual(repr(r[0]), '??')
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

8

News mentions

0

No linked articles in our index yet.