fix(ble): replace hardcoded tuple indices with FormatToken NamedTuple

This commit is contained in:
luoxu
2026-04-16 19:44:23 +08:00
committed by Luo Xu
parent 3f2b6c97f8
commit b7760a77e4
2 changed files with 50 additions and 44 deletions

View File

@@ -38,6 +38,7 @@ from typing import cast
import tree_sitter_c as tsc
import yaml
from c_format_parse import FormatToken
from c_format_parse import parse_format_string
from inttypes_map import TYPES_MACRO_MAP
from LogDBManager import LogDBManager
@@ -354,14 +355,14 @@ class LogCompressor:
tokens_tuple_map: list[int] = []
need_args = 0
for idx, tk in enumerate(tokens):
if isinstance(tk, tuple):
if isinstance(tk, FormatToken):
tokens_tuple_map.append(idx)
need_args = need_args + 1
if tk[4] == '*': # dynamic width
if tk.width == '*': # dynamic width
need_args = need_args + 1
log_info['hexify'] = False
return log_info
if tk[5] == '*': # dynamic precision
if tk.precision == '*': # dynamic precision
need_args = need_args + 1
log_info['hexify'] = False
return log_info
@@ -376,7 +377,7 @@ class LogCompressor:
raise SyntaxError(f'LogSyntaxError:{node.text.decode("utf-8")}')
# Process each argument
for i, (token, arg_node) in enumerate(zip([t for t in tokens if isinstance(t, tuple)], arguments)):
for i, (token, arg_node) in enumerate(zip([t for t in tokens if isinstance(t, FormatToken)], arguments)):
arg_text = arg_node.text.decode('utf-8')
log_info['arguments'].append((arg_text, arg_node.start_byte, arg_node.end_byte))
@@ -386,13 +387,9 @@ class LogCompressor:
# Handle special identifiers
if arg_text in FUNC_MACROS:
token_list = list(token)
token_list[6] = '@func' # Modify conversion char to special marker
tokens[tokens_tuple_map[i]] = tuple(token_list)
tokens[tokens_tuple_map[i]] = token._replace(conv_char='@func')
elif arg_text in LINE_MACROS:
token_list = list(token)
token_list[6] = '@line'
tokens[tokens_tuple_map[i]] = tuple(token_list)
tokens[tokens_tuple_map[i]] = token._replace(conv_char='@line')
# Handle hex functions
if (
@@ -410,9 +407,7 @@ class LogCompressor:
len_node = abs(hex_func_info[2])
else:
len_node = hex_args.named_children[hex_func_info[2]].text.decode('utf-8')
token_list = list(token)
token_list[6] = f'@hex_func@{buf_node}@{len_node}'
tokens[tokens_tuple_map[i]] = tuple(token_list)
tokens[tokens_tuple_map[i]] = token._replace(conv_char=f'@hex_func@{buf_node}@{len_node}')
log_info['argu_tokens'] = tokens
@@ -440,9 +435,9 @@ class LogCompressor:
raise ValueError(f'Unsupported node in concatenated string: {child.type}')
return ''.join(parts)
def _can_be_hexified(self, token: tuple[int, int, str, str, str, str, str], node: Node) -> bool:
def _can_be_hexified(self, token: FormatToken, node: Node) -> bool:
"""Determine if a node can be represented in hex format."""
if token[-1] != 's':
if token.conv_char != 's':
return True
if node.type == 'identifier' and node.text.decode('utf-8') in FUNC_MACROS:
@@ -502,7 +497,7 @@ class LogCompressor:
if log_info['hexify']:
# Count of arguments that are not special (__func__, __LINE__, etc.)
arg_tokens = [t for t in log_info['argu_tokens'] if isinstance(t, tuple)]
arg_tokens = [t for t in log_info['argu_tokens'] if isinstance(t, FormatToken)]
arg_count = len(arg_tokens)
arguments = []
sizes = []
@@ -515,23 +510,23 @@ class LogCompressor:
[a[0] for a in log_info['arguments'][1:]],
):
# Skip special tokens
if token[6] in ('@func', '@line'):
if token.conv_char in ('@func', '@line'):
arg_count -= 1
continue
# Handle hex function
if token[6].startswith('@hex_func'):
if token.conv_char.startswith('@hex_func'):
if not hex_func:
hex_func = []
hex_func.append(token[6])
hex_func.append(token.conv_char)
arg_count -= 1
continue
arguments.append(argument)
if token[6] == 'f' or token[5] == 'll': # float or long long
if token.conv_char == 'f' or token.length == 'll': # float or long long
sizes.append(f'{int(ARG_SIZE_TYPE.U64)}')
elif token[6] == 's':
elif token.conv_char == 's':
sizes.append(f'{int(ARG_SIZE_TYPE.STR)}')
else:
sizes.append(f'{int(ARG_SIZE_TYPE.U32)}')
@@ -626,16 +621,16 @@ class LogCompressor:
simple_fmt_list: list[str] = []
hex_buffer_cnt = 0
for token in log['argu_tokens']:
if isinstance(token, tuple):
if '@func' in token[6] or '@line' in token[6]:
if isinstance(token, FormatToken):
if '@func' in token.conv_char or '@line' in token.conv_char:
continue
if '@hex_func' in token[6]:
if '@hex_func' in token.conv_char:
simple_fmt_list.append(f'@hex_buffer{hex_buffer_cnt}')
no_buf_fmt += f'@hex_buffer{hex_buffer_cnt}'
hex_buffer_cnt += 1
continue
simple_fmt_list.append(token[2])
no_buf_fmt += token[2]
simple_fmt_list.append(token.full_spec)
no_buf_fmt += token.full_spec
else:
no_buf_fmt += token
simple_fmt_str = ' '.join(simple_fmt_list) if simple_fmt_list else None

View File

@@ -8,10 +8,24 @@ Parses C-style format strings and handles argument formatting for log compressio
"""
import struct
from typing import NamedTuple
from typing import Union
def parse_format_string(format_str: str) -> list[Union[str, tuple[int, int, str, str, str, str, str, str]]]:
class FormatToken(NamedTuple):
"""Parsed C format specifier, e.g. ``%08llx`` -> FormatToken(start, end, '%08llx', '0', '8', '', 'll', 'x')."""
start: int # index of '%' in the source string
end: int # index one-past the conversion char
full_spec: str # the raw specifier text, e.g. '%08llx'
flags: str # '-', '+', ' ', '#', '0', or ''
width: str # e.g. '10', '*', or ''
precision: str # e.g. '.2' content (without '.'), '*', or ''
length: str # 'h', 'hh', 'l', 'll', 'j', 'z', 't', or ''
conv_char: str # 'd', 'i', 'u', 'o', 'x', 'X', 'f', 's', etc.
def parse_format_string(format_str: str) -> list[Union[str, FormatToken]]:
"""
Parse a format string into tokens.
@@ -19,10 +33,9 @@ def parse_format_string(format_str: str) -> list[Union[str, tuple[int, int, str,
format_str: C-style format string
Returns:
List of tokens (strings or format spec tuples)
Tuple format: (start, end, full_spec, flags, width, precision, length, conv_char)
List of tokens (literal strings or FormatToken named-tuples)
"""
tokens: list[Union[str, tuple[int, int, str, str, str, str, str, str]]] = []
tokens: list[Union[str, FormatToken]] = []
i = 0
n = len(format_str)
@@ -85,7 +98,7 @@ def parse_format_string(format_str: str) -> list[Union[str, tuple[int, int, str,
conv_char = format_str[i]
i += 1
full_spec = format_str[start:i]
tokens.append((start, i, full_spec, flags, width, precision, length, conv_char))
tokens.append(FormatToken(start, i, full_spec, flags, width, precision, length, conv_char))
else:
# Invalid format spec, treat as literal text
tokens.append(format_str[start:i])
@@ -216,10 +229,8 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str:
arg_index = 0
for token in tokens:
if isinstance(token, tuple):
start, end, full_spec, flags, width, precision, length_mod, conv_char = token
if conv_char == '%':
if isinstance(token, FormatToken):
if token.conv_char == '%':
output.append('%')
else:
if arg_index >= len(args):
@@ -229,19 +240,19 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str:
arg_index += 1
# Character type
if conv_char == 'c':
if token.conv_char == 'c':
# Pad to 4 bytes for unpacking
padded = arg_bytes.ljust(4, b'\x00')
char_code = struct.unpack('>I', padded)[0]
output.append(chr(char_code))
# Pointer type
elif conv_char == 'p':
elif token.conv_char == 'p':
ptr_value = int.from_bytes(arg_bytes, 'big', signed=False)
output.append(hex(ptr_value))
# Floating point types
elif conv_char in 'fFeEgGaA':
elif token.conv_char in 'fFeEgGaA':
if len(arg_bytes) == 4:
float_value = struct.unpack('>f', arg_bytes)[0]
elif len(arg_bytes) == 8:
@@ -251,13 +262,13 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str:
output.append(str(float_value))
# Integer types
elif conv_char in 'diuoxX':
signed = conv_char in 'di'
elif token.conv_char in 'diuoxX':
signed = token.conv_char in 'di'
# Determine expected size
if length_mod == 'll':
if token.length == 'll':
expected_size = 8
elif length_mod in ('l', 'z', 'j', 't') or conv_char == 'p':
elif token.length in ('l', 'z', 'j', 't') or token.conv_char == 'p':
expected_size = 4
else:
expected_size = len(arg_bytes)
@@ -275,9 +286,9 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str:
# Convert to integer
int_value = int.from_bytes(arg_bytes, 'big', signed=signed)
output.append(format_integer(int_value, conv_char, flags, width, length_mod))
output.append(format_integer(int_value, token.conv_char, token.flags, token.width, token.length))
else:
raise ValueError(f'Unsupported conversion: {conv_char}')
raise ValueError(f'Unsupported conversion: {token.conv_char}')
else:
output.append(token)