diff --git a/Lib/email/_header_value_parser.py b/Lib/email/_header_value_parser.py index 792072ab9f6128..c503932ec237d7 100644 --- a/Lib/email/_header_value_parser.py +++ b/Lib/email/_header_value_parser.py @@ -1001,7 +1001,7 @@ class _InvalidEwError(errors.HeaderParseError): # returns the complete phrase from the start of the string value, plus any # characters left in the string after the phrase is removed. -_wsp_splitter = re.compile(r'([{}]+)'.format(''.join(WSP))).split +_non_wsp_matcher = re.compile(r'[^{}]*'.format(re.escape(_WSP))).match _non_atom_end_matcher = re.compile(r"[^{}]+".format( re.escape(''.join(ATOM_ENDS)))).match _non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall @@ -1022,39 +1022,57 @@ def _validate_xtext(xtext): xtext.defects.append(errors.UndecodableBytesDefect( "Non-ASCII characters found in header token")) -def _get_ptext_to_endchars(value, endchars): +_ERROR_TAIL_LEN = 60 + +def _tail(value, pos): + """Return a bounded slice of the unparsed input for error messages. + + Interpolating the whole remaining input into a ``HeaderParseError`` is + linear in its length, and many of these errors are raised and caught as + ordinary control flow, which would reintroduce quadratic behaviour on + large inputs (gh-136063). The truncated fragment is only an aid for + diagnostics, so capping it is harmless. + """ + if len(value) - pos <= _ERROR_TAIL_LEN: + return value[pos:] + return value[pos:pos + _ERROR_TAIL_LEN] + '...' + +def _get_ptext_to_endchars(value, pos, endchars): """Scan printables/quoted-pairs until endchars and return unquoted ptext. This function turns a run of qcontent, ccontent-without-comments, or dtext-with-quoted-printables into a single string by unquoting any - quoted printables. It returns the string, the remaining value, and - a flag that is True iff there were any quoted printables decoded. + quoted printables. It returns the string, the index of the remaining + value, and a flag that is True iff there were any quoted printables + decoded. """ - if not value: - return '', '', False - fragment, *remainder = _wsp_splitter(value, 1) + if pos >= len(value): + return '', pos, False + # The fragment is the run of non-WSP characters starting at pos. + fragend = _non_wsp_matcher(value, pos).end() vchars = [] escape = False had_qp = False - for pos in range(len(fragment)): - if fragment[pos] == '\\': + while pos < fragend: + char = value[pos] + if char == '\\': if escape: escape = False had_qp = True else: escape = True + pos += 1 continue if escape: escape = False - elif fragment[pos] in endchars: + elif char in endchars: break - vchars.append(fragment[pos]) - else: - pos = pos + 1 - return ''.join(vchars), ''.join([fragment[pos:]] + remainder), had_qp + vchars.append(char) + pos += 1 + return ''.join(vchars), pos, had_qp -def get_fws(value): +def get_fws(value, pos=0): """FWS = 1*WSP This isn't the RFC definition. We're using fws to represent tokens where @@ -1062,35 +1080,43 @@ def get_fws(value): been done so we don't need to watch out for CRLF. """ - newvalue = value.lstrip() - fws = WhiteSpaceTerminal(value[:len(value)-len(newvalue)], 'fws') - return fws, newvalue + newpos = pos + while newpos < len(value) and value[newpos].isspace(): + newpos += 1 + fws = WhiteSpaceTerminal(value[pos:newpos], 'fws') + return fws, newpos -def get_encoded_word(value, terminal_type='vtext'): +def get_encoded_word(value, pos=0, terminal_type='vtext'): """ encoded-word = "=?" charset "?" encoding "?" encoded-text "?=" """ ew = EncodedWord() - if not value.startswith('=?'): + if not value.startswith('=?', pos): raise errors.HeaderParseError( - "expected encoded word but found {}".format(value)) - tok, *remainder = value[2:].split('?=', 1) - if tok == value[2:]: + "expected encoded word but found {}".format(_tail(value, pos))) + end = value.find('?=', pos + 2) + if end < 0: raise errors.HeaderParseError( - "expected encoded word but found {}".format(value)) - remstr = ''.join(remainder) - if (len(remstr) > 1 and - remstr[0] in hexdigits and - remstr[1] in hexdigits and + "expected encoded word but found {}".format(_tail(value, pos))) + tok = value[pos + 2:end] + end += 2 + if (len(value) > end + 1 and + value[end] in hexdigits and + value[end + 1] in hexdigits and tok.count('?') < 2): # The ? after the CTE was followed by an encoded word escape (=XX). - rest, *remainder = remstr.split('?=', 1) - tok = tok + '?=' + rest + nextend = value.find('?=', end) + if nextend < 0: + tok = tok + '?=' + value[end:] + end = len(value) + else: + tok = tok + '?=' + value[end:nextend] + end = nextend + 2 if len(tok.split()) > 1: ew.defects.append(errors.InvalidHeaderDefect( "whitespace inside encoded word")) - ew.cte = value - value = ''.join(remainder) + ew.cte = value[pos:end] + pos = end try: text, charset, lang, defects = _ew.decode('=?' + tok + '?=') except (ValueError, KeyError): @@ -1099,23 +1125,24 @@ def get_encoded_word(value, terminal_type='vtext'): ew.charset = charset ew.lang = lang ew.defects.extend(defects) - while text: - if text[0] in WSP: - token, text = get_fws(text) + tpos = 0 + while tpos < len(text): + if text[tpos] in WSP: + token, tpos = get_fws(text, tpos) ew.append(token) continue - chars, *remainder = _wsp_splitter(text, 1) - vtext = ValueTerminal(chars, terminal_type) + chars_end = _non_wsp_matcher(text, tpos).end() + vtext = ValueTerminal(text[tpos:chars_end], terminal_type) _validate_xtext(vtext) ew.append(vtext) - text = ''.join(remainder) + tpos = chars_end # Encoded words should be followed by a WS - if value and value[0] not in WSP: + if pos < len(value) and value[pos] not in WSP: ew.defects.append(errors.InvalidHeaderDefect( "missing trailing whitespace after encoded-word")) - return ew, value + return ew, pos -def get_unstructured(value): +def get_unstructured(value, pos=0): """unstructured = (*([FWS] vchar) *WSP) / obs-unstruct obs-unstruct = *((*LF *CR *(obs-utext) *LF *CR)) / FWS) obs-utext = %d0 / obs-NO-WS-CTL / LF / CR @@ -1139,15 +1166,15 @@ def get_unstructured(value): # will never send us strings with bare CR or LF. unstructured = UnstructuredTokenList() - while value: - if value[0] in WSP: - token, value = get_fws(value) + while pos < len(value): + if value[pos] in WSP: + token, pos = get_fws(value, pos) unstructured.append(token) continue valid_ew = True - if value.startswith('=?'): + if value.startswith('=?', pos): try: - token, value = get_encoded_word(value, 'utext') + token, pos = get_encoded_word(value, pos, 'utext') except _InvalidEwError: valid_ew = False except errors.HeaderParseError: @@ -1167,7 +1194,8 @@ def get_unstructured(value): unstructured[-1], 'fws') unstructured.append(token) continue - tok, *remainder = _wsp_splitter(value, 1) + tokend = _non_wsp_matcher(value, pos).end() + tok = value[pos:tokend] # Split in the middle of an atom if there is a rfc2047 encoded word # which does not have WSP on both sides. The defect will be registered # the next time through the loop. @@ -1175,14 +1203,16 @@ def get_unstructured(value): # otherwise, performing it on an invalid encoded word can cause # the parser to go in an infinite loop. if valid_ew and rfc2047_matcher.search(tok): - tok, *remainder = value.partition('=?') + ewstart = value.find('=?', pos) + tok = value[pos:ewstart] + tokend = ewstart vtext = ValueTerminal(tok, 'utext') _validate_xtext(vtext) unstructured.append(vtext) - value = ''.join(remainder) + pos = tokend return unstructured -def get_qp_ctext(value): +def get_qp_ctext(value, pos=0): r"""ctext = This is not the RFC ctext, since we are handling nested comments in comment @@ -1194,12 +1224,12 @@ def get_qp_ctext(value): is ' '. """ - ptext, value, _ = _get_ptext_to_endchars(value, '()') + ptext, pos, _ = _get_ptext_to_endchars(value, pos, '()') ptext = WhiteSpaceTerminal(ptext, 'ptext') _validate_xtext(ptext) - return ptext, value + return ptext, pos -def get_qcontent(value): +def get_qcontent(value, pos=0): """qcontent = qtext / quoted-pair We allow anything except the DQUOTE character, but if we find any ASCII @@ -1209,53 +1239,53 @@ def get_qcontent(value): is a ValueTerminal. """ - ptext, value, _ = _get_ptext_to_endchars(value, '"') + ptext, pos, _ = _get_ptext_to_endchars(value, pos, '"') ptext = ValueTerminal(ptext, 'ptext') _validate_xtext(ptext) - return ptext, value + return ptext, pos -def get_atext(value): +def get_atext(value, pos=0): """atext = We allow any non-ATOM_ENDS in atext, but add an InvalidATextDefect to the token's defects list if we find non-atext characters. """ - m = _non_atom_end_matcher(value) + m = _non_atom_end_matcher(value, pos) if not m: raise errors.HeaderParseError( - "expected atext but found '{}'".format(value)) + "expected atext but found '{}'".format(_tail(value, pos))) atext = m.group() - value = value[len(atext):] + pos = m.end() atext = ValueTerminal(atext, 'atext') _validate_xtext(atext) - return atext, value + return atext, pos -def get_bare_quoted_string(value): +def get_bare_quoted_string(value, pos=0): """bare-quoted-string = DQUOTE *([FWS] qcontent) [FWS] DQUOTE A quoted-string without the leading or trailing white space. Its value is the text between the quote marks, with whitespace preserved and quoted pairs decoded. """ - if not value or value[0] != '"': + if pos >= len(value) or value[pos] != '"': raise errors.HeaderParseError( - "expected '\"' but found '{}'".format(value)) + "expected '\"' but found '{}'".format(_tail(value, pos))) bare_quoted_string = BareQuotedString() - value = value[1:] - if value and value[0] == '"': - return bare_quoted_string, value[1:] - while value and value[0] != '"': - if value[0] in WSP: - token, value = get_fws(value) - elif value[:2] == '=?': + pos += 1 + if pos < len(value) and value[pos] == '"': + return bare_quoted_string, pos + 1 + while pos < len(value) and value[pos] != '"': + if value[pos] in WSP: + token, pos = get_fws(value, pos) + elif value.startswith('=?', pos): valid_ew = False try: - token, value = get_encoded_word(value) + token, pos = get_encoded_word(value, pos) bare_quoted_string.defects.append(errors.InvalidHeaderDefect( "encoded word inside quoted string")) valid_ew = True except errors.HeaderParseError: - token, value = get_qcontent(value) + token, pos = get_qcontent(value, pos) # Collapse the whitespace between two encoded words that occur in a # bare-quoted-string. if valid_ew and len(bare_quoted_string) > 1: @@ -1264,53 +1294,53 @@ def get_bare_quoted_string(value): bare_quoted_string[-1] = EWWhiteSpaceTerminal( bare_quoted_string[-1], 'fws') else: - token, value = get_qcontent(value) + token, pos = get_qcontent(value, pos) bare_quoted_string.append(token) - if not value: + if pos >= len(value): bare_quoted_string.defects.append(errors.InvalidHeaderDefect( "end of header inside quoted string")) - return bare_quoted_string, value - return bare_quoted_string, value[1:] + return bare_quoted_string, pos + return bare_quoted_string, pos + 1 -def get_comment(value): +def get_comment(value, pos=0): """comment = "(" *([FWS] ccontent) [FWS] ")" ccontent = ctext / quoted-pair / comment We handle nested comments here, and quoted-pair in our qp-ctext routine. """ - if value and value[0] != '(': + if pos < len(value) and value[pos] != '(': raise errors.HeaderParseError( - "expected '(' but found '{}'".format(value)) + "expected '(' but found '{}'".format(_tail(value, pos))) comment = Comment() - value = value[1:] - while value and value[0] != ")": - if value[0] in WSP: - token, value = get_fws(value) - elif value[0] == '(': - token, value = get_comment(value) + pos += 1 + while pos < len(value) and value[pos] != ")": + if value[pos] in WSP: + token, pos = get_fws(value, pos) + elif value[pos] == '(': + token, pos = get_comment(value, pos) else: - token, value = get_qp_ctext(value) + token, pos = get_qp_ctext(value, pos) comment.append(token) - if not value: + if pos >= len(value): comment.defects.append(errors.InvalidHeaderDefect( "end of header inside comment")) - return comment, value - return comment, value[1:] + return comment, pos + return comment, pos + 1 -def get_cfws(value): +def get_cfws(value, pos=0): """CFWS = (1*([FWS] comment) [FWS]) / FWS """ cfws = CFWSList() - while value and value[0] in CFWS_LEADER: - if value[0] in WSP: - token, value = get_fws(value) + while pos < len(value) and value[pos] in CFWS_LEADER: + if value[pos] in WSP: + token, pos = get_fws(value, pos) else: - token, value = get_comment(value) + token, pos = get_comment(value, pos) cfws.append(token) - return cfws, value + return cfws, pos -def get_quoted_string(value): +def get_quoted_string(value, pos=0): """quoted-string = [CFWS] [CFWS] 'bare-quoted-string' is an intermediate class defined by this @@ -1318,88 +1348,88 @@ def get_quoted_string(value): without any attached CFWS. """ quoted_string = QuotedString() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) quoted_string.append(token) - token, value = get_bare_quoted_string(value) + token, pos = get_bare_quoted_string(value, pos) quoted_string.append(token) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) quoted_string.append(token) - return quoted_string, value + return quoted_string, pos -def get_atom(value): +def get_atom(value, pos=0): """atom = [CFWS] 1*atext [CFWS] An atom could be an rfc2047 encoded word. """ atom = Atom() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) atom.append(token) - if value and value[0] in ATOM_ENDS: + if pos < len(value) and value[pos] in ATOM_ENDS: raise errors.HeaderParseError( - "expected atom but found '{}'".format(value)) - if value.startswith('=?'): + "expected atom but found '{}'".format(_tail(value, pos))) + if value.startswith('=?', pos): try: - token, value = get_encoded_word(value) + token, pos = get_encoded_word(value, pos) except errors.HeaderParseError: # XXX: need to figure out how to register defects when # appropriate here. - token, value = get_atext(value) + token, pos = get_atext(value, pos) else: - token, value = get_atext(value) + token, pos = get_atext(value, pos) atom.append(token) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) atom.append(token) - return atom, value + return atom, pos -def get_dot_atom_text(value): +def get_dot_atom_text(value, pos=0): """ dot-text = 1*atext *("." 1*atext) """ dot_atom_text = DotAtomText() - if not value or value[0] in ATOM_ENDS: + if pos >= len(value) or value[pos] in ATOM_ENDS: raise errors.HeaderParseError("expected atom at a start of " - "dot-atom-text but found '{}'".format(value)) - while value and value[0] not in ATOM_ENDS: - token, value = get_atext(value) + "dot-atom-text but found '{}'".format(_tail(value, pos))) + while pos < len(value) and value[pos] not in ATOM_ENDS: + token, pos = get_atext(value, pos) dot_atom_text.append(token) - if value and value[0] == '.': + if pos < len(value) and value[pos] == '.': dot_atom_text.append(DOT) - value = value[1:] + pos += 1 if dot_atom_text[-1] is DOT: raise errors.HeaderParseError("expected atom at end of dot-atom-text " - "but found '{}'".format('.'+value)) - return dot_atom_text, value + "but found '{}'".format('.'+_tail(value, pos))) + return dot_atom_text, pos -def get_dot_atom(value): +def get_dot_atom(value, pos=0): """ dot-atom = [CFWS] dot-atom-text [CFWS] Any place we can have a dot atom, we could instead have an rfc2047 encoded word. """ dot_atom = DotAtom() - if value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) dot_atom.append(token) - if value.startswith('=?'): + if value.startswith('=?', pos): try: - token, value = get_encoded_word(value) + token, pos = get_encoded_word(value, pos) except errors.HeaderParseError: # XXX: need to figure out how to register defects when # appropriate here. - token, value = get_dot_atom_text(value) + token, pos = get_dot_atom_text(value, pos) else: - token, value = get_dot_atom_text(value) + token, pos = get_dot_atom_text(value, pos) dot_atom.append(token) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) dot_atom.append(token) - return dot_atom, value + return dot_atom, pos -def get_word(value): +def get_word(value, pos=0): """word = atom / quoted-string Either atom or quoted-string may start with CFWS. We have to peel off this @@ -1415,25 +1445,25 @@ def get_word(value): parse tree is more confusing than it is helpful. """ - if value[0] in CFWS_LEADER: - leader, value = get_cfws(value) + if value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) else: leader = None - if not value: + if pos >= len(value): raise errors.HeaderParseError( "Expected 'atom' or 'quoted-string' but found nothing.") - if value[0]=='"': - token, value = get_quoted_string(value) - elif value[0] in SPECIALS: + if value[pos]=='"': + token, pos = get_quoted_string(value, pos) + elif value[pos] in SPECIALS: raise errors.HeaderParseError("Expected 'atom' or 'quoted-string' " - "but found '{}'".format(value)) + "but found '{}'".format(_tail(value, pos))) else: - token, value = get_atom(value) + token, pos = get_atom(value, pos) if leader is not None: token[:0] = [leader] - return token, value + return token, pos -def get_phrase(value): +def get_phrase(value, pos=0): """ phrase = 1*word / obs-phrase obs-phrase = word *(word / "." / CFWS) @@ -1447,20 +1477,20 @@ def get_phrase(value): """ phrase = Phrase() try: - token, value = get_word(value) + token, pos = get_word(value, pos) phrase.append(token) except errors.HeaderParseError: phrase.defects.append(errors.InvalidHeaderDefect( "phrase does not start with word")) - while value and value[0] not in PHRASE_ENDS: - if value[0]=='.': + while pos < len(value) and value[pos] not in PHRASE_ENDS: + if value[pos]=='.': phrase.append(DOT) phrase.defects.append(errors.ObsoleteHeaderDefect( "period in 'phrase'")) - value = value[1:] + pos += 1 else: try: - token, value = get_word(value) + token, pos = get_word(value, pos) if (token[0].token_type == 'encoded-word' and phrase and phrase[-1].token_type == 'atom' @@ -1472,40 +1502,49 @@ def get_phrase(value): # linear ws between ews needs special handing... phrase[-1][-1] = EWWhiteSpaceTerminal(phrase[-1], 'fws') except errors.HeaderParseError: - if value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) phrase.defects.append(errors.ObsoleteHeaderDefect( "comment found without atom")) else: raise phrase.append(token) - return phrase, value + return phrase, pos -def get_local_part(value): +def get_local_part(value, pos=0): """ local-part = dot-atom / quoted-string / obs-local-part """ local_part = LocalPart() leader = None - if value and value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value: + start = pos + if pos < len(value) and value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value): raise errors.HeaderParseError( - "expected local-part but found '{}'".format(value)) + "expected local-part but found '{}'".format(_tail(value, pos))) try: - token, value = get_dot_atom(value) + token, pos = get_dot_atom(value, pos) except errors.HeaderParseError: try: - token, value = get_word(value) + token, pos = get_word(value, pos) except errors.HeaderParseError: - if value[0] != '\\' and value[0] in PHRASE_ENDS: + if value[pos] != '\\' and value[pos] in PHRASE_ENDS: raise token = TokenList() if leader is not None: token[:0] = [leader] local_part.append(token) - if value and (value[0]=='\\' or value[0] not in PHRASE_ENDS): - obs_local_part, value = get_obs_local_part(str(local_part) + value) + if pos < len(value) and (value[pos]=='\\' or value[pos] not in PHRASE_ENDS): + # The obsolete syntax requires us to re-parse the local-part as an + # obs-local-part. Re-parse directly from the original input (from the + # start of the local-part) rather than rebuilding and re-parsing the + # rendered ``str(local_part) + remaining`` text. The latter is both + # quadratic for an address-list of obs local-parts (gh-136063) and + # incorrect when a token rendered back to text differs from the source + # (e.g. an encoded-word decoding to a bare special such as '@'), which + # RFC 2047 anyway forbids in an addr-spec. + obs_local_part, pos = get_obs_local_part(value, start) if obs_local_part.token_type == 'invalid-obs-local-part': local_part.defects.append(errors.InvalidHeaderDefect( "local-part is not dot-atom, quoted-string, or obs-local-part")) @@ -1513,26 +1552,26 @@ def get_local_part(value): local_part.defects.append(errors.ObsoleteHeaderDefect( "local-part is not a dot-atom (contains CFWS)")) local_part[0] = obs_local_part - return local_part, value + return local_part, pos -def get_obs_local_part(value): +def get_obs_local_part(value, pos=0): """ obs-local-part = word *("." word) """ obs_local_part = ObsLocalPart() last_non_ws_was_dot = False - while value and (value[0]=='\\' or value[0] not in PHRASE_ENDS): - if value[0] == '.': + while pos < len(value) and (value[pos]=='\\' or value[pos] not in PHRASE_ENDS): + if value[pos] == '.': if last_non_ws_was_dot: obs_local_part.defects.append(errors.InvalidHeaderDefect( "invalid repeated '.'")) obs_local_part.append(DOT) last_non_ws_was_dot = True - value = value[1:] + pos += 1 continue - elif value[0]=='\\': - obs_local_part.append(ValueTerminal(value[0], + elif value[pos]=='\\': + obs_local_part.append(ValueTerminal(value[pos], 'misplaced-special')) - value = value[1:] + pos += 1 obs_local_part.defects.append(errors.InvalidHeaderDefect( "'\\' character outside of quoted-string/ccontent")) last_non_ws_was_dot = False @@ -1541,16 +1580,16 @@ def get_obs_local_part(value): obs_local_part.defects.append(errors.InvalidHeaderDefect( "missing '.' between words")) try: - token, value = get_word(value) + token, pos = get_word(value, pos) last_non_ws_was_dot = False except errors.HeaderParseError: - if value[0] not in CFWS_LEADER: + if value[pos] not in CFWS_LEADER: raise - token, value = get_cfws(value) + token, pos = get_cfws(value, pos) obs_local_part.append(token) if not obs_local_part: raise errors.HeaderParseError( - "expected obs-local-part but found '{}'".format(value)) + "expected obs-local-part but found '{}'".format(_tail(value, pos))) if (obs_local_part[0].token_type == 'dot' or obs_local_part[0].token_type=='cfws' and len(obs_local_part) > 1 and @@ -1565,9 +1604,9 @@ def get_obs_local_part(value): "Invalid trailing '.' in local part")) if obs_local_part.defects: obs_local_part.token_type = 'invalid-obs-local-part' - return obs_local_part, value + return obs_local_part, pos -def get_dtext(value): +def get_dtext(value, pos=0): r""" dtext = / obs-dtext obs-dtext = obs-NO-WS-CTL / quoted-pair @@ -1579,116 +1618,116 @@ def get_dtext(value): added to the returned token's defect list. """ - ptext, value, had_qp = _get_ptext_to_endchars(value, '[]') + ptext, pos, had_qp = _get_ptext_to_endchars(value, pos, '[]') ptext = ValueTerminal(ptext, 'ptext') if had_qp: ptext.defects.append(errors.ObsoleteHeaderDefect( "quoted printable found in domain-literal")) _validate_xtext(ptext) - return ptext, value + return ptext, pos -def _check_for_early_dl_end(value, domain_literal): - if value: +def _check_for_early_dl_end(value, pos, domain_literal): + if pos < len(value): return False domain_literal.defects.append(errors.InvalidHeaderDefect( "end of input inside domain-literal")) domain_literal.append(ValueTerminal(']', 'domain-literal-end')) return True -def get_domain_literal(value): +def get_domain_literal(value, pos=0): """ domain-literal = [CFWS] "[" *([FWS] dtext) [FWS] "]" [CFWS] """ domain_literal = DomainLiteral() - if value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) domain_literal.append(token) - if not value: + if pos >= len(value): raise errors.HeaderParseError("expected domain-literal") - if value[0] != '[': + if value[pos] != '[': raise errors.HeaderParseError("expected '[' at start of domain-literal " - "but found '{}'".format(value)) - value = value[1:] + "but found '{}'".format(_tail(value, pos))) + pos += 1 domain_literal.append(ValueTerminal('[', 'domain-literal-start')) - if _check_for_early_dl_end(value, domain_literal): - return domain_literal, value - if value[0] in WSP: - token, value = get_fws(value) + if _check_for_early_dl_end(value, pos, domain_literal): + return domain_literal, pos + if value[pos] in WSP: + token, pos = get_fws(value, pos) domain_literal.append(token) - token, value = get_dtext(value) + token, pos = get_dtext(value, pos) domain_literal.append(token) - if _check_for_early_dl_end(value, domain_literal): - return domain_literal, value - if value[0] in WSP: - token, value = get_fws(value) + if _check_for_early_dl_end(value, pos, domain_literal): + return domain_literal, pos + if value[pos] in WSP: + token, pos = get_fws(value, pos) domain_literal.append(token) - if _check_for_early_dl_end(value, domain_literal): - return domain_literal, value - if value[0] != ']': + if _check_for_early_dl_end(value, pos, domain_literal): + return domain_literal, pos + if value[pos] != ']': raise errors.HeaderParseError("expected ']' at end of domain-literal " - "but found '{}'".format(value)) + "but found '{}'".format(_tail(value, pos))) domain_literal.append(ValueTerminal(']', 'domain-literal-end')) - value = value[1:] - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + pos += 1 + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) domain_literal.append(token) - return domain_literal, value + return domain_literal, pos -def get_domain(value): +def get_domain(value, pos=0): """ domain = dot-atom / domain-literal / obs-domain obs-domain = atom *("." atom)) """ domain = Domain() leader = None - if value and value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value: + if pos < len(value) and value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value): raise errors.HeaderParseError( - "expected domain but found '{}'".format(value)) - if value[0] == '[': - token, value = get_domain_literal(value) + "expected domain but found '{}'".format(_tail(value, pos))) + if value[pos] == '[': + token, pos = get_domain_literal(value, pos) if leader is not None: token[:0] = [leader] domain.append(token) - return domain, value + return domain, pos try: - token, value = get_dot_atom(value) + token, pos = get_dot_atom(value, pos) except errors.HeaderParseError: - token, value = get_atom(value) - if value and value[0] == '@': + token, pos = get_atom(value, pos) + if pos < len(value) and value[pos] == '@': raise errors.HeaderParseError('Invalid Domain') if leader is not None: token[:0] = [leader] domain.append(token) - if value and value[0] == '.': + if pos < len(value) and value[pos] == '.': domain.defects.append(errors.ObsoleteHeaderDefect( "domain is not a dot-atom (contains CFWS)")) if domain[0].token_type == 'dot-atom': domain[:] = domain[0] - while value and value[0] == '.': + while pos < len(value) and value[pos] == '.': domain.append(DOT) - token, value = get_atom(value[1:]) + token, pos = get_atom(value, pos + 1) domain.append(token) - return domain, value + return domain, pos -def get_addr_spec(value): +def get_addr_spec(value, pos=0): """ addr-spec = local-part "@" domain """ addr_spec = AddrSpec() - token, value = get_local_part(value) + token, pos = get_local_part(value, pos) addr_spec.append(token) - if not value or value[0] != '@': + if pos >= len(value) or value[pos] != '@': addr_spec.defects.append(errors.InvalidHeaderDefect( "addr-spec local part with no domain")) - return addr_spec, value + return addr_spec, pos addr_spec.append(ValueTerminal('@', 'address-at-symbol')) - token, value = get_domain(value[1:]) + token, pos = get_domain(value, pos + 1) addr_spec.append(token) - return addr_spec, value + return addr_spec, pos -def get_obs_route(value): +def get_obs_route(value, pos=0): """ obs-route = obs-domain-list ":" obs-domain-list = *(CFWS / ",") "@" domain *("," [CFWS] ["@" domain]) @@ -1696,88 +1735,88 @@ def get_obs_route(value): there is no obs-domain-list in the parse tree). """ obs_route = ObsRoute() - while value and (value[0]==',' or value[0] in CFWS_LEADER): - if value[0] in CFWS_LEADER: - token, value = get_cfws(value) + while pos < len(value) and (value[pos]==',' or value[pos] in CFWS_LEADER): + if value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) obs_route.append(token) - elif value[0] == ',': + elif value[pos] == ',': obs_route.append(ListSeparator) - value = value[1:] - if not value or value[0] != '@': + pos += 1 + if pos >= len(value) or value[pos] != '@': raise errors.HeaderParseError( - "expected obs-route domain but found '{}'".format(value)) + "expected obs-route domain but found '{}'".format(_tail(value, pos))) obs_route.append(RouteComponentMarker) - token, value = get_domain(value[1:]) + token, pos = get_domain(value, pos + 1) obs_route.append(token) - while value and value[0]==',': + while pos < len(value) and value[pos]==',': obs_route.append(ListSeparator) - value = value[1:] - if not value: + pos += 1 + if pos >= len(value): break - if value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) obs_route.append(token) - if not value: + if pos >= len(value): break - if value[0] == '@': + if value[pos] == '@': obs_route.append(RouteComponentMarker) - token, value = get_domain(value[1:]) + token, pos = get_domain(value, pos + 1) obs_route.append(token) - if not value: + if pos >= len(value): raise errors.HeaderParseError("end of header while parsing obs-route") - if value[0] != ':': + if value[pos] != ':': raise errors.HeaderParseError( "expected ':' marking end of " - "obs-route but found '{}'".format(value)) + "obs-route but found '{}'".format(_tail(value, pos))) obs_route.append(ValueTerminal(':', 'end-of-obs-route-marker')) - return obs_route, value[1:] + return obs_route, pos + 1 -def get_angle_addr(value): +def get_angle_addr(value, pos=0): """ angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr obs-angle-addr = [CFWS] "<" obs-route addr-spec ">" [CFWS] """ angle_addr = AngleAddr() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) angle_addr.append(token) - if not value or value[0] != '<': + if pos >= len(value) or value[pos] != '<': raise errors.HeaderParseError( - "expected angle-addr but found '{}'".format(value)) + "expected angle-addr but found '{}'".format(_tail(value, pos))) angle_addr.append(ValueTerminal('<', 'angle-addr-start')) - value = value[1:] + pos += 1 # Although it is not legal per RFC5322, SMTP uses '<>' in certain # circumstances. - if value and value[0] == '>': + if pos < len(value) and value[pos] == '>': angle_addr.append(ValueTerminal('>', 'angle-addr-end')) angle_addr.defects.append(errors.InvalidHeaderDefect( "null addr-spec in angle-addr")) - value = value[1:] - return angle_addr, value + pos += 1 + return angle_addr, pos try: - token, value = get_addr_spec(value) + token, pos = get_addr_spec(value, pos) except errors.HeaderParseError: try: - token, value = get_obs_route(value) + token, pos = get_obs_route(value, pos) angle_addr.defects.append(errors.ObsoleteHeaderDefect( "obsolete route specification in angle-addr")) except errors.HeaderParseError: raise errors.HeaderParseError( - "expected addr-spec or obs-route but found '{}'".format(value)) + "expected addr-spec or obs-route but found '{}'".format(_tail(value, pos))) angle_addr.append(token) - token, value = get_addr_spec(value) + token, pos = get_addr_spec(value, pos) angle_addr.append(token) - if value and value[0] == '>': - value = value[1:] + if pos < len(value) and value[pos] == '>': + pos += 1 else: angle_addr.defects.append(errors.InvalidHeaderDefect( "missing trailing '>' on angle-addr")) angle_addr.append(ValueTerminal('>', 'angle-addr-end')) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) angle_addr.append(token) - return angle_addr, value + return angle_addr, pos -def get_display_name(value): +def get_display_name(value, pos=0): """ display-name = phrase Because this is simply a name-rule, we don't return a display-name @@ -1786,33 +1825,33 @@ def get_display_name(value): """ display_name = DisplayName() - token, value = get_phrase(value) + token, pos = get_phrase(value, pos) display_name.extend(token[:]) display_name.defects = token.defects[:] - return display_name, value + return display_name, pos -def get_name_addr(value): +def get_name_addr(value, pos=0): """ name-addr = [display-name] angle-addr """ name_addr = NameAddr() # Both the optional display name and the angle-addr can start with cfws. leader = None - if not value: + if pos >= len(value): raise errors.HeaderParseError( - "expected name-addr but found '{}'".format(value)) - if value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value: + "expected name-addr but found '{}'".format(_tail(value, pos))) + if value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value): raise errors.HeaderParseError( "expected name-addr but found '{}'".format(leader)) - if value[0] != '<': - if value[0] in PHRASE_ENDS: + if value[pos] != '<': + if value[pos] in PHRASE_ENDS: raise errors.HeaderParseError( - "expected name-addr but found '{}'".format(value)) - token, value = get_display_name(value) - if not value: + "expected name-addr but found '{}'".format(_tail(value, pos))) + token, pos = get_display_name(value, pos) + if pos >= len(value): raise errors.HeaderParseError( "expected name-addr but found '{}'".format(token)) if leader is not None: @@ -1822,13 +1861,13 @@ def get_name_addr(value): token[:0] = [leader] leader = None name_addr.append(token) - token, value = get_angle_addr(value) + token, pos = get_angle_addr(value, pos) if leader is not None: token[:0] = [leader] name_addr.append(token) - return name_addr, value + return name_addr, pos -def get_mailbox(value): +def get_mailbox(value, pos=0): """ mailbox = name-addr / addr-spec """ @@ -1836,20 +1875,20 @@ def get_mailbox(value): # addr-spec is to try parsing each one. mailbox = Mailbox() try: - token, value = get_name_addr(value) + token, pos = get_name_addr(value, pos) except errors.HeaderParseError: try: - token, value = get_addr_spec(value) + token, pos = get_addr_spec(value, pos) except errors.HeaderParseError: raise errors.HeaderParseError( - "expected mailbox but found '{}'".format(value)) + "expected mailbox but found '{}'".format(_tail(value, pos))) if any(isinstance(x, errors.InvalidHeaderDefect) for x in token.all_defects): mailbox.token_type = 'invalid-mailbox' mailbox.append(token) - return mailbox, value + return mailbox, pos -def get_invalid_mailbox(value, endchars): +def get_invalid_mailbox(value, pos, endchars): """ Read everything up to one of the chars in endchars. This is outside the formal grammar. The InvalidMailbox TokenList that is @@ -1857,17 +1896,17 @@ def get_invalid_mailbox(value, endchars): """ invalid_mailbox = InvalidMailbox() - while value and value[0] not in endchars: - if value[0] in PHRASE_ENDS: - invalid_mailbox.append(ValueTerminal(value[0], + while pos < len(value) and value[pos] not in endchars: + if value[pos] in PHRASE_ENDS: + invalid_mailbox.append(ValueTerminal(value[pos], 'misplaced-special')) - value = value[1:] + pos += 1 else: - token, value = get_phrase(value) + token, pos = get_phrase(value, pos) invalid_mailbox.append(token) - return invalid_mailbox, value + return invalid_mailbox, pos -def get_mailbox_list(value): +def get_mailbox_list(value, pos=0): """ mailbox-list = (mailbox *("," mailbox)) / obs-mbox-list obs-mbox-list = *([CFWS] ",") mailbox *("," [mailbox / CFWS]) @@ -1880,118 +1919,118 @@ def get_mailbox_list(value): """ mailbox_list = MailboxList() - while value and value[0] != ';': + while pos < len(value) and value[pos] != ';': try: - token, value = get_mailbox(value) + token, pos = get_mailbox(value, pos) mailbox_list.append(token) except errors.HeaderParseError: leader = None - if value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value or value[0] in ',;': + if value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value) or value[pos] in ',;': mailbox_list.append(leader) mailbox_list.defects.append(errors.ObsoleteHeaderDefect( "empty element in mailbox-list")) else: - token, value = get_invalid_mailbox(value, ',;') + token, pos = get_invalid_mailbox(value, pos, ',;') if leader is not None: token[:0] = [leader] mailbox_list.append(token) mailbox_list.defects.append(errors.InvalidHeaderDefect( "invalid mailbox in mailbox-list")) - elif value[0] == ',': + elif value[pos] == ',': mailbox_list.defects.append(errors.ObsoleteHeaderDefect( "empty element in mailbox-list")) else: - token, value = get_invalid_mailbox(value, ',;') + token, pos = get_invalid_mailbox(value, pos, ',;') if leader is not None: token[:0] = [leader] mailbox_list.append(token) mailbox_list.defects.append(errors.InvalidHeaderDefect( "invalid mailbox in mailbox-list")) - if value and value[0] not in ',;': + if pos < len(value) and value[pos] not in ',;': # Crap after mailbox; treat it as an invalid mailbox. # The mailbox info will still be available. mailbox = mailbox_list[-1] mailbox.token_type = 'invalid-mailbox' - token, value = get_invalid_mailbox(value, ',;') + token, pos = get_invalid_mailbox(value, pos, ',;') mailbox.extend(token) mailbox_list.defects.append(errors.InvalidHeaderDefect( "invalid mailbox in mailbox-list")) - if value and value[0] == ',': + if pos < len(value) and value[pos] == ',': mailbox_list.append(ListSeparator) - value = value[1:] - return mailbox_list, value + pos += 1 + return mailbox_list, pos -def get_group_list(value): +def get_group_list(value, pos=0): """ group-list = mailbox-list / CFWS / obs-group-list obs-group-list = 1*([CFWS] ",") [CFWS] """ group_list = GroupList() - if not value: + if pos >= len(value): group_list.defects.append(errors.InvalidHeaderDefect( "end of header before group-list")) - return group_list, value + return group_list, pos leader = None - if value and value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value: + if pos < len(value) and value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value): # This should never happen in email parsing, since CFWS-only is a # legal alternative to group-list in a group, which is the only # place group-list appears. group_list.defects.append(errors.InvalidHeaderDefect( "end of header in group-list")) group_list.append(leader) - return group_list, value - if value[0] == ';': + return group_list, pos + if value[pos] == ';': group_list.append(leader) - return group_list, value - token, value = get_mailbox_list(value) + return group_list, pos + token, pos = get_mailbox_list(value, pos) if len(token.all_mailboxes)==0: if leader is not None: group_list.append(leader) group_list.extend(token) group_list.defects.append(errors.ObsoleteHeaderDefect( "group-list with empty entries")) - return group_list, value + return group_list, pos if leader is not None: token[:0] = [leader] group_list.append(token) - return group_list, value + return group_list, pos -def get_group(value): +def get_group(value, pos=0): """ group = display-name ":" [group-list] ";" [CFWS] """ group = Group() - token, value = get_display_name(value) - if not value or value[0] != ':': + token, pos = get_display_name(value, pos) + if pos >= len(value) or value[pos] != ':': raise errors.HeaderParseError("expected ':' at end of group " - "display name but found '{}'".format(value)) + "display name but found '{}'".format(_tail(value, pos))) group.append(token) group.append(ValueTerminal(':', 'group-display-name-terminator')) - value = value[1:] - if value and value[0] == ';': + pos += 1 + if pos < len(value) and value[pos] == ';': group.append(ValueTerminal(';', 'group-terminator')) - return group, value[1:] - token, value = get_group_list(value) + return group, pos + 1 + token, pos = get_group_list(value, pos) group.append(token) - if not value: + if pos >= len(value): group.defects.append(errors.InvalidHeaderDefect( "end of header in group")) - elif value[0] != ';': + elif value[pos] != ';': raise errors.HeaderParseError( - "expected ';' at end of group but found {}".format(value)) + "expected ';' at end of group but found {}".format(_tail(value, pos))) group.append(ValueTerminal(';', 'group-terminator')) - value = value[1:] - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + pos += 1 + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) group.append(token) - return group, value + return group, pos -def get_address(value): +def get_address(value, pos=0): """ address = mailbox / group Note that counter-intuitively, an address can be either a single address or @@ -2010,17 +2049,17 @@ def get_address(value): # would be a premature optimization. address = Address() try: - token, value = get_group(value) + token, pos = get_group(value, pos) except errors.HeaderParseError: try: - token, value = get_mailbox(value) + token, pos = get_mailbox(value, pos) except errors.HeaderParseError: raise errors.HeaderParseError( - "expected address but found '{}'".format(value)) + "expected address but found '{}'".format(_tail(value, pos))) address.append(token) - return address, value + return address, pos -def get_address_list(value): +def get_address_list(value, pos=0): """ address_list = (address *("," address)) / obs-addr-list obs-addr-list = *([CFWS] ",") address *("," [address / CFWS]) @@ -2031,137 +2070,137 @@ def get_address_list(value): """ address_list = AddressList() - while value: + while pos < len(value): try: - token, value = get_address(value) + token, pos = get_address(value, pos) address_list.append(token) except errors.HeaderParseError: leader = None - if value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value or value[0] == ',': + if value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value) or value[pos] == ',': address_list.append(leader) address_list.defects.append(errors.ObsoleteHeaderDefect( "address-list entry with no content")) else: - token, value = get_invalid_mailbox(value, ',') + token, pos = get_invalid_mailbox(value, pos, ',') if leader is not None: token[:0] = [leader] address_list.append(Address([token])) address_list.defects.append(errors.InvalidHeaderDefect( "invalid address in address-list")) - elif value[0] == ',': + elif value[pos] == ',': address_list.defects.append(errors.ObsoleteHeaderDefect( "empty element in address-list")) else: - token, value = get_invalid_mailbox(value, ',') + token, pos = get_invalid_mailbox(value, pos, ',') if leader is not None: token[:0] = [leader] address_list.append(Address([token])) address_list.defects.append(errors.InvalidHeaderDefect( "invalid address in address-list")) - if value and value[0] != ',': + if pos < len(value) and value[pos] != ',': # Crap after address: add it to the address list # as an invalid mailbox - token, value = get_invalid_mailbox(value, ',') + token, pos = get_invalid_mailbox(value, pos, ',') address_list.append(Address([token])) address_list.defects.append(errors.InvalidHeaderDefect( "invalid address in address-list")) - if value: # Must be a , at this point. + if pos < len(value): # Must be a , at this point. address_list.append(ListSeparator) - value = value[1:] - return address_list, value + pos += 1 + return address_list, pos -def get_no_fold_literal(value): +def get_no_fold_literal(value, pos=0): """ no-fold-literal = "[" *dtext "]" """ no_fold_literal = NoFoldLiteral() - if not value: + if pos >= len(value): raise errors.HeaderParseError( - "expected no-fold-literal but found '{}'".format(value)) - if value[0] != '[': + "expected no-fold-literal but found '{}'".format(_tail(value, pos))) + if value[pos] != '[': raise errors.HeaderParseError( "expected '[' at the start of no-fold-literal " - "but found '{}'".format(value)) + "but found '{}'".format(_tail(value, pos))) no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start')) - value = value[1:] - token, value = get_dtext(value) + pos += 1 + token, pos = get_dtext(value, pos) no_fold_literal.append(token) - if not value or value[0] != ']': + if pos >= len(value) or value[pos] != ']': raise errors.HeaderParseError( "expected ']' at the end of no-fold-literal " - "but found '{}'".format(value)) + "but found '{}'".format(_tail(value, pos))) no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end')) - return no_fold_literal, value[1:] + return no_fold_literal, pos + 1 -def get_msg_id(value): +def get_msg_id(value, pos=0): """msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS] id-left = dot-atom-text / obs-id-left id-right = dot-atom-text / no-fold-literal / obs-id-right no-fold-literal = "[" *dtext "]" """ msg_id = MsgID() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) msg_id.append(token) - if not value or value[0] != '<': + if pos >= len(value) or value[pos] != '<': raise errors.HeaderParseError( - "expected msg-id but found '{}'".format(value)) + "expected msg-id but found '{}'".format(_tail(value, pos))) msg_id.append(ValueTerminal('<', 'msg-id-start')) - value = value[1:] + pos += 1 # Parse id-left. try: - token, value = get_dot_atom_text(value) + token, pos = get_dot_atom_text(value, pos) except errors.HeaderParseError: try: # obs-id-left is same as local-part of add-spec. - token, value = get_obs_local_part(value) + token, pos = get_obs_local_part(value, pos) msg_id.defects.append(errors.ObsoleteHeaderDefect( "obsolete id-left in msg-id")) except errors.HeaderParseError: raise errors.HeaderParseError( "expected dot-atom-text or obs-id-left" - " but found '{}'".format(value)) + " but found '{}'".format(_tail(value, pos))) msg_id.append(token) - if not value or value[0] != '@': + if pos >= len(value) or value[pos] != '@': msg_id.defects.append(errors.InvalidHeaderDefect( "msg-id with no id-right")) # Even though there is no id-right, if the local part # ends with `>` let's just parse it too and return # along with the defect. - if value and value[0] == '>': + if pos < len(value) and value[pos] == '>': msg_id.append(ValueTerminal('>', 'msg-id-end')) - value = value[1:] - return msg_id, value + pos += 1 + return msg_id, pos msg_id.append(ValueTerminal('@', 'address-at-symbol')) - value = value[1:] + pos += 1 # Parse id-right. try: - token, value = get_dot_atom_text(value) + token, pos = get_dot_atom_text(value, pos) except errors.HeaderParseError: try: - token, value = get_no_fold_literal(value) + token, pos = get_no_fold_literal(value, pos) except errors.HeaderParseError: try: - token, value = get_domain(value) + token, pos = get_domain(value, pos) msg_id.defects.append(errors.ObsoleteHeaderDefect( "obsolete id-right in msg-id")) except errors.HeaderParseError: raise errors.HeaderParseError( "expected dot-atom-text, no-fold-literal or obs-id-right" - " but found '{}'".format(value)) + " but found '{}'".format(_tail(value, pos))) msg_id.append(token) - if value and value[0] == '>': - value = value[1:] + if pos < len(value) and value[pos] == '>': + pos += 1 else: msg_id.defects.append(errors.InvalidHeaderDefect( "missing trailing '>' on msg-id")) msg_id.append(ValueTerminal('>', 'msg-id-end')) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) msg_id.append(token) - return msg_id, value + return msg_id, pos def parse_message_id(value): @@ -2169,7 +2208,7 @@ def parse_message_id(value): """ message_id = MessageID() try: - token, value = get_msg_id(value) + token, pos = get_msg_id(value) message_id.append(token) except errors.HeaderParseError as ex: token = get_unstructured(value) @@ -2178,9 +2217,9 @@ def parse_message_id(value): errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex))) else: # Value after parsing a valid msg_id should be None. - if value: + if pos < len(value): message_id.defects.append(errors.InvalidHeaderDefect( - "Unexpected {!r}".format(value))) + "Unexpected {!r}".format(_tail(value, pos)))) return message_id @@ -2189,21 +2228,22 @@ def parse_message_ids(value): references = "References:" 1*msg-id CRLF """ message_id_list = MessageIDList() - while value: - if value[0] == ',': + pos = 0 + while pos < len(value): + if value[pos] == ',': # message id list separated with commas - this is invalid, # but happens rather frequently in the wild message_id_list.defects.append( errors.InvalidHeaderDefect("comma in msg-id list")) message_id_list.append( WhiteSpaceTerminal(' ', 'invalid-comma-replacement')) - value = value[1:] + pos += 1 continue try: - token, value = get_msg_id(value) + token, pos = get_msg_id(value, pos) message_id_list.append(token) except errors.HeaderParseError as ex: - token = get_unstructured(value) + token = get_unstructured(value, pos) message_id_list.append(InvalidMessageID(token)) message_id_list.defects.append( errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex))) @@ -2229,16 +2269,17 @@ def parse_mime_version(value): mime_version.defects.append(errors.HeaderMissingRequiredValue( "Missing MIME version number (eg: 1.0)")) return mime_version - if value[0] in CFWS_LEADER: - token, value = get_cfws(value) + pos = 0 + if value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) mime_version.append(token) - if not value: + if pos >= len(value): mime_version.defects.append(errors.HeaderMissingRequiredValue( "Expected MIME version number but found only CFWS")) - digits = '' - while value and value[0] != '.' and value[0] not in CFWS_LEADER: - digits += value[0] - value = value[1:] + start = pos + while pos < len(value) and value[pos] != '.' and value[pos] not in CFWS_LEADER: + pos += 1 + digits = value[start:pos] if not digits.isdigit(): mime_version.defects.append(errors.InvalidHeaderDefect( "Expected MIME major version number but found {!r}".format(digits))) @@ -2246,30 +2287,30 @@ def parse_mime_version(value): else: mime_version.major = int(digits) mime_version.append(ValueTerminal(digits, 'digits')) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) mime_version.append(token) - if not value or value[0] != '.': + if pos >= len(value) or value[pos] != '.': if mime_version.major is not None: mime_version.defects.append(errors.InvalidHeaderDefect( "Incomplete MIME version; found only major number")) - if value: - mime_version.append(ValueTerminal(value, 'xtext')) + if pos < len(value): + mime_version.append(ValueTerminal(value[pos:], 'xtext')) return mime_version mime_version.append(ValueTerminal('.', 'version-separator')) - value = value[1:] - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + pos += 1 + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) mime_version.append(token) - if not value: + if pos >= len(value): if mime_version.major is not None: mime_version.defects.append(errors.InvalidHeaderDefect( "Incomplete MIME version; found only major number")) return mime_version - digits = '' - while value and value[0] not in CFWS_LEADER: - digits += value[0] - value = value[1:] + start = pos + while pos < len(value) and value[pos] not in CFWS_LEADER: + pos += 1 + digits = value[start:pos] if not digits.isdigit(): mime_version.defects.append(errors.InvalidHeaderDefect( "Expected MIME minor version number but found {!r}".format(digits))) @@ -2277,16 +2318,16 @@ def parse_mime_version(value): else: mime_version.minor = int(digits) mime_version.append(ValueTerminal(digits, 'digits')) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) mime_version.append(token) - if value: + if pos < len(value): mime_version.defects.append(errors.InvalidHeaderDefect( "Excess non-CFWS text after MIME version")) - mime_version.append(ValueTerminal(value, 'xtext')) + mime_version.append(ValueTerminal(value[pos:], 'xtext')) return mime_version -def get_invalid_parameter(value): +def get_invalid_parameter(value, pos=0): """ Read everything up to the next ';'. This is outside the formal grammar. The InvalidParameter TokenList that is @@ -2294,17 +2335,17 @@ def get_invalid_parameter(value): """ invalid_parameter = InvalidParameter() - while value and value[0] != ';': - if value[0] in PHRASE_ENDS: - invalid_parameter.append(ValueTerminal(value[0], + while pos < len(value) and value[pos] != ';': + if value[pos] in PHRASE_ENDS: + invalid_parameter.append(ValueTerminal(value[pos], 'misplaced-special')) - value = value[1:] + pos += 1 else: - token, value = get_phrase(value) + token, pos = get_phrase(value, pos) invalid_parameter.append(token) - return invalid_parameter, value + return invalid_parameter, pos -def get_ttext(value): +def get_ttext(value, pos=0): """ttext = We allow any non-TOKEN_ENDS in ttext, but add defects to the token's @@ -2313,17 +2354,17 @@ def get_ttext(value): because we follow the spirit of RFC 5322. """ - m = _non_token_end_matcher(value) + m = _non_token_end_matcher(value, pos) if not m: raise errors.HeaderParseError( - "expected ttext but found '{}'".format(value)) + "expected ttext but found '{}'".format(_tail(value, pos))) ttext = m.group() - value = value[len(ttext):] + pos = m.end() ttext = ValueTerminal(ttext, 'ttext') _validate_xtext(ttext) - return ttext, value + return ttext, pos -def get_token(value): +def get_token(value, pos=0): """token = [CFWS] 1*ttext [CFWS] The RFC equivalent of ttext is any US-ASCII chars except space, ctls, or @@ -2333,20 +2374,20 @@ def get_token(value): """ mtoken = Token() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) mtoken.append(token) - if value and value[0] in TOKEN_ENDS: + if pos < len(value) and value[pos] in TOKEN_ENDS: raise errors.HeaderParseError( - "expected token but found '{}'".format(value)) - token, value = get_ttext(value) + "expected token but found '{}'".format(_tail(value, pos))) + token, pos = get_ttext(value, pos) mtoken.append(token) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) mtoken.append(token) - return mtoken, value + return mtoken, pos -def get_attrtext(value): +def get_attrtext(value, pos=0): """attrtext = 1*(any non-ATTRIBUTE_ENDS character) We allow any non-ATTRIBUTE_ENDS in attrtext, but add defects to the @@ -2355,17 +2396,17 @@ def get_attrtext(value): them, because we follow the spirit of RFC 5322. """ - m = _non_attribute_end_matcher(value) + m = _non_attribute_end_matcher(value, pos) if not m: raise errors.HeaderParseError( - "expected attrtext but found {!r}".format(value)) + "expected attrtext but found {!r}".format(_tail(value, pos))) attrtext = m.group() - value = value[len(attrtext):] + pos = m.end() attrtext = ValueTerminal(attrtext, 'attrtext') _validate_xtext(attrtext) - return attrtext, value + return attrtext, pos -def get_attribute(value): +def get_attribute(value, pos=0): """ [CFWS] 1*attrtext [CFWS] This version of the BNF makes the CFWS explicit, and as usual we use a @@ -2375,20 +2416,20 @@ def get_attribute(value): """ attribute = Attribute() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) attribute.append(token) - if value and value[0] in ATTRIBUTE_ENDS: + if pos < len(value) and value[pos] in ATTRIBUTE_ENDS: raise errors.HeaderParseError( - "expected token but found '{}'".format(value)) - token, value = get_attrtext(value) + "expected token but found '{}'".format(_tail(value, pos))) + token, pos = get_attrtext(value, pos) attribute.append(token) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) attribute.append(token) - return attribute, value + return attribute, pos -def get_extended_attrtext(value): +def get_extended_attrtext(value, pos=0): """attrtext = 1*(any non-ATTRIBUTE_ENDS character plus '%') This is a special parsing routine so that we get a value that @@ -2396,17 +2437,17 @@ def get_extended_attrtext(value): string later). """ - m = _non_extended_attribute_end_matcher(value) + m = _non_extended_attribute_end_matcher(value, pos) if not m: raise errors.HeaderParseError( - "expected extended attrtext but found {!r}".format(value)) + "expected extended attrtext but found {!r}".format(_tail(value, pos))) attrtext = m.group() - value = value[len(attrtext):] + pos = m.end() attrtext = ValueTerminal(attrtext, 'extended-attrtext') _validate_xtext(attrtext) - return attrtext, value + return attrtext, pos -def get_extended_attribute(value): +def get_extended_attribute(value, pos=0): """ [CFWS] 1*extended_attrtext [CFWS] This is like the non-extended version except we allow % characters, so that @@ -2415,20 +2456,20 @@ def get_extended_attribute(value): """ # XXX: should we have an ExtendedAttribute TokenList? attribute = Attribute() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) attribute.append(token) - if value and value[0] in EXTENDED_ATTRIBUTE_ENDS: + if pos < len(value) and value[pos] in EXTENDED_ATTRIBUTE_ENDS: raise errors.HeaderParseError( - "expected token but found '{}'".format(value)) - token, value = get_extended_attrtext(value) + "expected token but found '{}'".format(_tail(value, pos))) + token, pos = get_extended_attrtext(value, pos) attribute.append(token) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + if pos < len(value) and value[pos] in CFWS_LEADER: + token, pos = get_cfws(value, pos) attribute.append(token) - return attribute, value + return attribute, pos -def get_section(value): +def get_section(value, pos=0): """ '*' digits The formal BNF is more complicated because leading 0s are not allowed. We @@ -2438,49 +2479,49 @@ def get_section(value): """ section = Section() - if not value or value[0] != '*': + if pos >= len(value) or value[pos] != '*': raise errors.HeaderParseError("Expected section but found {}".format( - value)) + value[pos:])) section.append(ValueTerminal('*', 'section-marker')) - value = value[1:] - if not value or not value[0].isdigit(): + pos += 1 + if pos >= len(value) or not value[pos].isdigit(): raise errors.HeaderParseError("Expected section number but " - "found {}".format(value)) - digits = '' - while value and value[0].isdigit(): - digits += value[0] - value = value[1:] + "found {}".format(_tail(value, pos))) + start = pos + while pos < len(value) and value[pos].isdigit(): + pos += 1 + digits = value[start:pos] if digits[0] == '0' and digits != '0': section.defects.append(errors.InvalidHeaderDefect( "section number has an invalid leading 0")) section.number = int(digits) section.append(ValueTerminal(digits, 'digits')) - return section, value + return section, pos -def get_value(value): +def get_value(value, pos=0): """ quoted-string / attribute """ v = Value() - if not value: + if pos >= len(value): raise errors.HeaderParseError("Expected value but found end of string") leader = None - if value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value: + if value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value): raise errors.HeaderParseError("Expected value but found " "only {}".format(leader)) - if value[0] == '"': - token, value = get_quoted_string(value) + if value[pos] == '"': + token, pos = get_quoted_string(value, pos) else: - token, value = get_extended_attribute(value) + token, pos = get_extended_attribute(value, pos) if leader is not None: token[:0] = [leader] v.append(token) - return v, value + return v, pos -def get_parameter(value): +def get_parameter(value, pos=0): """ attribute [section] ["*"] [CFWS] "=" value The CFWS is implied by the RFC but not made explicit in the BNF. This @@ -2492,55 +2533,60 @@ def get_parameter(value): # and the 'extended-attribute' marker (the '*') , but we've never seen that # in the wild and we will therefore ignore the possibility. param = Parameter() - token, value = get_attribute(value) + # ``cur`` is the string currently being scanned and ``pos`` the position + # in it. For most of this function ``cur`` is the original ``value``, but + # the extended-value hackery below switches it to a derived string while + # ``remainder`` keeps track of the resume position in the original value. + cur = value + token, pos = get_attribute(cur, pos) param.append(token) - if not value or value[0] == ';': + if pos >= len(cur) or cur[pos] == ';': param.defects.append(errors.InvalidHeaderDefect("Parameter contains " "name ({}) but no value".format(token))) - return param, value - if value[0] == '*': + return param, pos + if cur[pos] == '*': try: - token, value = get_section(value) + token, pos = get_section(cur, pos) param.sectioned = True param.append(token) except errors.HeaderParseError: pass - if not value: + if pos >= len(cur): raise errors.HeaderParseError("Incomplete parameter") - if value[0] == '*': + if cur[pos] == '*': param.append(ValueTerminal('*', 'extended-parameter-marker')) - value = value[1:] + pos += 1 param.extended = True - if value[0] != '=': + if cur[pos] != '=': raise errors.HeaderParseError("Parameter not followed by '='") param.append(ValueTerminal('=', 'parameter-separator')) - value = value[1:] - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) + pos += 1 + if pos < len(cur) and cur[pos] in CFWS_LEADER: + token, pos = get_cfws(cur, pos) param.append(token) remainder = None appendto = param - if param.extended and value and value[0] == '"': + if param.extended and pos < len(cur) and cur[pos] == '"': # Now for some serious hackery to handle the common invalid case of # double quotes around an extended value. We also accept (with defect) # a value marked as encoded that isn't really. - qstring, remainder = get_quoted_string(value) + qstring, remainder = get_quoted_string(cur, pos) inner_value = qstring.stripped_value semi_valid = False if param.section_number == 0: if inner_value and inner_value[0] == "'": semi_valid = True else: - token, rest = get_attrtext(inner_value) - if rest and rest[0] == "'": + token, rest = get_attrtext(inner_value, 0) + if rest < len(inner_value) and inner_value[rest] == "'": semi_valid = True else: try: - token, rest = get_extended_attrtext(inner_value) + token, rest = get_extended_attrtext(inner_value, 0) except: pass else: - if not rest: + if rest >= len(inner_value): semi_valid = True if semi_valid: param.defects.append(errors.InvalidHeaderDefect( @@ -2551,33 +2597,34 @@ def get_parameter(value): t[:] = [] appendto = t break - value = inner_value + cur = inner_value + pos = 0 else: remainder = None param.defects.append(errors.InvalidHeaderDefect( "Parameter marked as extended but appears to have a " "quoted string value that is non-encoded")) - if value and value[0] == "'": + if pos < len(cur) and cur[pos] == "'": token = None else: - token, value = get_value(value) + token, pos = get_value(cur, pos) if not param.extended or param.section_number > 0: - if not value or value[0] != "'": + if pos >= len(cur) or cur[pos] != "'": appendto.append(token) if remainder is not None: - assert not value, value - value = remainder - return param, value + assert pos >= len(cur), cur[pos:] + pos = remainder + return param, pos param.defects.append(errors.InvalidHeaderDefect( "Apparent initial-extended-value but attribute " "was not marked as extended or was not initial section")) - if not value: + if pos >= len(cur): # Assume the charset/lang is missing and the token is the value. param.defects.append(errors.InvalidHeaderDefect( "Missing required charset/lang delimiters")) appendto.append(token) if remainder is None: - return param, value + return param, pos else: if token is not None: for t in token: @@ -2586,40 +2633,40 @@ def get_parameter(value): t.token_type == 'attrtext' appendto.append(t) param.charset = t.value - if value[0] != "'": + if cur[pos] != "'": raise errors.HeaderParseError("Expected RFC2231 char/lang encoding " - "delimiter, but found {!r}".format(value)) + "delimiter, but found {!r}".format(_tail(cur, pos))) appendto.append(ValueTerminal("'", 'RFC2231-delimiter')) - value = value[1:] - if value and value[0] != "'": - token, value = get_attrtext(value) + pos += 1 + if pos < len(cur) and cur[pos] != "'": + token, pos = get_attrtext(cur, pos) appendto.append(token) param.lang = token.value - if not value or value[0] != "'": + if pos >= len(cur) or cur[pos] != "'": raise errors.HeaderParseError("Expected RFC2231 char/lang encoding " - "delimiter, but found {}".format(value)) + "delimiter, but found {}".format(_tail(cur, pos))) appendto.append(ValueTerminal("'", 'RFC2231-delimiter')) - value = value[1:] + pos += 1 if remainder is not None: # Treat the rest of value as bare quoted string content. v = Value() - while value: - if value[0] in WSP: - token, value = get_fws(value) - elif value[0] == '"': + while pos < len(cur): + if cur[pos] in WSP: + token, pos = get_fws(cur, pos) + elif cur[pos] == '"': token = ValueTerminal('"', 'DQUOTE') - value = value[1:] + pos += 1 else: - token, value = get_qcontent(value) + token, pos = get_qcontent(cur, pos) v.append(token) token = v else: - token, value = get_value(value) + token, pos = get_value(cur, pos) appendto.append(token) if remainder is not None: - assert not value, value - value = remainder - return param, value + assert pos >= len(cur), cur[pos:] + pos = remainder + return param, pos def parse_mime_parameters(value): """ parameter *( ";" parameter ) @@ -2635,59 +2682,60 @@ def parse_mime_parameters(value): """ mime_parameters = MimeParameters() - while value: + pos = 0 + while pos < len(value): try: - token, value = get_parameter(value) + token, pos = get_parameter(value, pos) mime_parameters.append(token) except errors.HeaderParseError: leader = None - if value[0] in CFWS_LEADER: - leader, value = get_cfws(value) - if not value: + if value[pos] in CFWS_LEADER: + leader, pos = get_cfws(value, pos) + if pos >= len(value): mime_parameters.append(leader) return mime_parameters - if value[0] == ';': + if value[pos] == ';': if leader is not None: mime_parameters.append(leader) mime_parameters.defects.append(errors.InvalidHeaderDefect( "parameter entry with no content")) else: - token, value = get_invalid_parameter(value) + token, pos = get_invalid_parameter(value, pos) if leader: token[:0] = [leader] mime_parameters.append(token) mime_parameters.defects.append(errors.InvalidHeaderDefect( "invalid parameter {!r}".format(token))) - if value and value[0] != ';': + if pos < len(value) and value[pos] != ';': # Junk after the otherwise valid parameter. Mark it as # invalid, but it will have a value. param = mime_parameters[-1] param.token_type = 'invalid-parameter' - token, value = get_invalid_parameter(value) + token, pos = get_invalid_parameter(value, pos) param.extend(token) mime_parameters.defects.append(errors.InvalidHeaderDefect( "parameter with invalid trailing text {!r}".format(token))) - if value: + if pos < len(value): # Must be a ';' at this point. mime_parameters.append(ValueTerminal(';', 'parameter-separator')) - value = value[1:] + pos += 1 return mime_parameters -def _find_mime_parameters(tokenlist, value): +def _find_mime_parameters(tokenlist, value, pos=0): """Do our best to find the parameters in an invalid MIME header """ - while value and value[0] != ';': - if value[0] in PHRASE_ENDS: - tokenlist.append(ValueTerminal(value[0], 'misplaced-special')) - value = value[1:] + while pos < len(value) and value[pos] != ';': + if value[pos] in PHRASE_ENDS: + tokenlist.append(ValueTerminal(value[pos], 'misplaced-special')) + pos += 1 else: - token, value = get_phrase(value) + token, pos = get_phrase(value, pos) tokenlist.append(token) - if not value: + if pos >= len(value): return tokenlist.append(ValueTerminal(';', 'parameter-separator')) - tokenlist.append(parse_mime_parameters(value[1:])) + tokenlist.append(parse_mime_parameters(value[pos + 1:])) def parse_content_type_header(value): """ maintype "/" subtype *( ";" parameter ) @@ -2701,48 +2749,49 @@ def parse_content_type_header(value): ctype.defects.append(errors.HeaderMissingRequiredValue( "Missing content type specification")) return ctype + pos = 0 try: - token, value = get_token(value) + token, pos = get_token(value, pos) except errors.HeaderParseError: ctype.defects.append(errors.InvalidHeaderDefect( - "Expected content maintype but found {!r}".format(value))) - _find_mime_parameters(ctype, value) + "Expected content maintype but found {!r}".format(_tail(value, pos)))) + _find_mime_parameters(ctype, value, pos) return ctype ctype.append(token) # XXX: If we really want to follow the formal grammar we should make # mantype and subtype specialized TokenLists here. Probably not worth it. - if not value or value[0] != '/': + if pos >= len(value) or value[pos] != '/': ctype.defects.append(errors.InvalidHeaderDefect( "Invalid content type")) - if value: - _find_mime_parameters(ctype, value) + if pos < len(value): + _find_mime_parameters(ctype, value, pos) return ctype ctype.maintype = token.value.strip().lower() ctype.append(ValueTerminal('/', 'content-type-separator')) - value = value[1:] + pos += 1 try: - token, value = get_token(value) + token, pos = get_token(value, pos) except errors.HeaderParseError: ctype.defects.append(errors.InvalidHeaderDefect( - "Expected content subtype but found {!r}".format(value))) - _find_mime_parameters(ctype, value) + "Expected content subtype but found {!r}".format(_tail(value, pos)))) + _find_mime_parameters(ctype, value, pos) return ctype ctype.append(token) ctype.subtype = token.value.strip().lower() - if not value: + if pos >= len(value): return ctype - if value[0] != ';': + if value[pos] != ';': ctype.defects.append(errors.InvalidHeaderDefect( "Only parameters are valid after content type, but " - "found {!r}".format(value))) + "found {!r}".format(_tail(value, pos)))) # The RFC requires that a syntactically invalid content-type be treated # as text/plain. Perhaps we should postel this, but we should probably # only do that if we were checking the subtype value against IANA. del ctype.maintype, ctype.subtype - _find_mime_parameters(ctype, value) + _find_mime_parameters(ctype, value, pos) return ctype ctype.append(ValueTerminal(';', 'parameter-separator')) - ctype.append(parse_mime_parameters(value[1:])) + ctype.append(parse_mime_parameters(value[pos + 1:])) return ctype def parse_content_disposition_header(value): @@ -2754,25 +2803,26 @@ def parse_content_disposition_header(value): disp_header.defects.append(errors.HeaderMissingRequiredValue( "Missing content disposition")) return disp_header + pos = 0 try: - token, value = get_token(value) + token, pos = get_token(value, pos) except errors.HeaderParseError: disp_header.defects.append(errors.InvalidHeaderDefect( - "Expected content disposition but found {!r}".format(value))) - _find_mime_parameters(disp_header, value) + "Expected content disposition but found {!r}".format(_tail(value, pos)))) + _find_mime_parameters(disp_header, value, pos) return disp_header disp_header.append(token) disp_header.content_disposition = token.value.strip().lower() - if not value: + if pos >= len(value): return disp_header - if value[0] != ';': + if value[pos] != ';': disp_header.defects.append(errors.InvalidHeaderDefect( "Only parameters are valid after content disposition, but " - "found {!r}".format(value))) - _find_mime_parameters(disp_header, value) + "found {!r}".format(_tail(value, pos)))) + _find_mime_parameters(disp_header, value, pos) return disp_header disp_header.append(ValueTerminal(';', 'parameter-separator')) - disp_header.append(parse_mime_parameters(value[1:])) + disp_header.append(parse_mime_parameters(value[pos + 1:])) return disp_header def parse_content_transfer_encoding_header(value): @@ -2785,24 +2835,25 @@ def parse_content_transfer_encoding_header(value): cte_header.defects.append(errors.HeaderMissingRequiredValue( "Missing content transfer encoding")) return cte_header + pos = 0 try: - token, value = get_token(value) + token, pos = get_token(value, pos) except errors.HeaderParseError: cte_header.defects.append(errors.InvalidHeaderDefect( - "Expected content transfer encoding but found {!r}".format(value))) + "Expected content transfer encoding but found {!r}".format(_tail(value, pos)))) else: cte_header.append(token) cte_header.cte = token.value.strip().lower() - if not value: + if pos >= len(value): return cte_header - while value: + while pos < len(value): cte_header.defects.append(errors.InvalidHeaderDefect( "Extra text after content transfer encoding")) - if value[0] in PHRASE_ENDS: - cte_header.append(ValueTerminal(value[0], 'misplaced-special')) - value = value[1:] + if value[pos] in PHRASE_ENDS: + cte_header.append(ValueTerminal(value[pos], 'misplaced-special')) + pos += 1 else: - token, value = get_phrase(value) + token, pos = get_phrase(value, pos) cte_header.append(token) return cte_header diff --git a/Lib/email/headerregistry.py b/Lib/email/headerregistry.py index 48cd85a65ba9f6..4c573f039b14cb 100644 --- a/Lib/email/headerregistry.py +++ b/Lib/email/headerregistry.py @@ -38,8 +38,8 @@ def __init__(self, display_name='', username='', domain='', addr_spec=None): if username or domain: raise TypeError("addrspec specified when username and/or " "domain also specified") - a_s, rest = parser.get_addr_spec(addr_spec) - if rest: + a_s, pos = parser.get_addr_spec(addr_spec) + if pos < len(addr_spec): raise ValueError("Invalid addr_spec; only '{}' " "could be parsed from '{}'".format( a_s, addr_spec)) @@ -328,8 +328,8 @@ class AddressHeader: @staticmethod def value_parser(value): - address_list, value = parser.get_address_list(value) - assert not value, 'this should not happen' + address_list, pos = parser.get_address_list(value) + assert pos >= len(value), 'this should not happen' return address_list @classmethod diff --git a/Lib/test/support/smtpd.py b/Lib/test/support/smtpd.py index 6537679db9ad24..d9b8366b922d32 100755 --- a/Lib/test/support/smtpd.py +++ b/Lib/test/support/smtpd.py @@ -437,12 +437,12 @@ def _getaddr(self, arg): if not arg: return '', '' if arg.lstrip().startswith('<'): - address, rest = get_angle_addr(arg) + address, pos = get_angle_addr(arg) else: - address, rest = get_addr_spec(arg) + address, pos = get_addr_spec(arg) if not address: - return address, rest - return address.addr_spec, rest + return address, arg[pos:] + return address.addr_spec, arg[pos:] def _getparams(self, params): # Return params as dictionary. Return None if not all parameters diff --git a/Lib/test/test_email/test__header_value_parser.py b/Lib/test/test_email/test__header_value_parser.py index 9d9fe418ee4d06..f7546f35b0391c 100644 --- a/Lib/test/test_email/test__header_value_parser.py +++ b/Lib/test/test_email/test__header_value_parser.py @@ -30,7 +30,8 @@ def _assert_results(self, tl, rest, string, value, defects, remainder, def _test_get_x(self, method, source, string, value, defects, remainder, comments=None): - tl, rest = method(source) + tl, pos = method(source, 0) + rest = source[pos:] self._assert_results(tl, rest, string, value, defects, remainder, comments=None) return tl @@ -44,25 +45,11 @@ def _test_parse_x(self, method, input, string, value, defects, class TestParser(TestParserMixin, TestEmailBase): - # _wsp_splitter - rfc_printable_ascii = bytes(range(33, 127)).decode('ascii') rfc_atext_chars = (string.ascii_letters + string.digits + "!#$%&\'*+-/=?^_`{}|~") rfc_dtext_chars = rfc_printable_ascii.translate(str.maketrans('','',r'\[]')) - def test__wsp_splitter_one_word(self): - self.assertEqual(parser._wsp_splitter('foo', 1), ['foo']) - - def test__wsp_splitter_two_words(self): - self.assertEqual(parser._wsp_splitter('foo def', 1), - ['foo', ' ', 'def']) - - def test__wsp_splitter_ws_runs(self): - self.assertEqual(parser._wsp_splitter('foo \t def jik', 1), - ['foo', ' \t ', 'def jik']) - - # get_fws def test_get_fws_only(self): @@ -172,9 +159,9 @@ def test_get_encoded_word_quopri_utf_escape_follows_cte(self): # get_unstructured - def _get_unst(self, value): - token = parser.get_unstructured(value) - return token, '' + def _get_unst(self, value, pos=0): + token = parser.get_unstructured(value, pos) + return token, len(value) def test_get_unstructured_null(self): self._test_get_x(self._get_unst, '', '', '', [], '') diff --git a/Misc/NEWS.d/next/Security/2026-06-28-00-00-00.gh-issue-136063.hvpIdx.rst b/Misc/NEWS.d/next/Security/2026-06-28-00-00-00.gh-issue-136063.hvpIdx.rst new file mode 100644 index 00000000000000..2a670c5b4330c7 --- /dev/null +++ b/Misc/NEWS.d/next/Security/2026-06-28-00-00-00.gh-issue-136063.hvpIdx.rst @@ -0,0 +1,11 @@ +Fixed multiple quadratic-complexity parsing issues in the :mod:`email` +header value parser that could be exploited for denial of service. The +parser now advances through the input using indices instead of repeatedly +slicing off the already-parsed prefix. + +As part of this change, an obsolete ``local-part`` is now re-parsed from the +original source text rather than from the decoded representation of the +already-parsed tokens. This only affects malformed addresses that contain an +:rfc:`2047` encoded-word inside an ``addr-spec``, which :rfc:`2047` does not +permit; such an encoded-word is no longer decoded and the address is reported +as invalid. Patch by Serhiy Storchaka.