diff --git a/components/bt/common/ble_log/extension/log_compression/scripts/ble_log_compress.py b/components/bt/common/ble_log/extension/log_compression/scripts/ble_log_compress.py index 211f10af21d..7f046e04934 100644 --- a/components/bt/common/ble_log/extension/log_compression/scripts/ble_log_compress.py +++ b/components/bt/common/ble_log/extension/log_compression/scripts/ble_log_compress.py @@ -38,6 +38,7 @@ from typing import cast import tree_sitter_c as tsc import yaml +from c_format_parse import FormatToken from c_format_parse import parse_format_string from inttypes_map import TYPES_MACRO_MAP from LogDBManager import LogDBManager @@ -354,14 +355,14 @@ class LogCompressor: tokens_tuple_map: list[int] = [] need_args = 0 for idx, tk in enumerate(tokens): - if isinstance(tk, tuple): + if isinstance(tk, FormatToken): tokens_tuple_map.append(idx) need_args = need_args + 1 - if tk[4] == '*': # dynamic width + if tk.width == '*': # dynamic width need_args = need_args + 1 log_info['hexify'] = False return log_info - if tk[5] == '*': # dynamic precision + if tk.precision == '*': # dynamic precision need_args = need_args + 1 log_info['hexify'] = False return log_info @@ -376,7 +377,7 @@ class LogCompressor: raise SyntaxError(f'LogSyntaxError:{node.text.decode("utf-8")}') # Process each argument - for i, (token, arg_node) in enumerate(zip([t for t in tokens if isinstance(t, tuple)], arguments)): + for i, (token, arg_node) in enumerate(zip([t for t in tokens if isinstance(t, FormatToken)], arguments)): arg_text = arg_node.text.decode('utf-8') log_info['arguments'].append((arg_text, arg_node.start_byte, arg_node.end_byte)) @@ -386,13 +387,9 @@ class LogCompressor: # Handle special identifiers if arg_text in FUNC_MACROS: - token_list = list(token) - token_list[6] = '@func' # Modify conversion char to special marker - tokens[tokens_tuple_map[i]] = tuple(token_list) + tokens[tokens_tuple_map[i]] = token._replace(conv_char='@func') elif arg_text in LINE_MACROS: - token_list = list(token) - token_list[6] = '@line' - tokens[tokens_tuple_map[i]] = tuple(token_list) + tokens[tokens_tuple_map[i]] = token._replace(conv_char='@line') # Handle hex functions if ( @@ -410,9 +407,7 @@ class LogCompressor: len_node = abs(hex_func_info[2]) else: len_node = hex_args.named_children[hex_func_info[2]].text.decode('utf-8') - token_list = list(token) - token_list[6] = f'@hex_func@{buf_node}@{len_node}' - tokens[tokens_tuple_map[i]] = tuple(token_list) + tokens[tokens_tuple_map[i]] = token._replace(conv_char=f'@hex_func@{buf_node}@{len_node}') log_info['argu_tokens'] = tokens @@ -440,9 +435,9 @@ class LogCompressor: raise ValueError(f'Unsupported node in concatenated string: {child.type}') return ''.join(parts) - def _can_be_hexified(self, token: tuple[int, int, str, str, str, str, str], node: Node) -> bool: + def _can_be_hexified(self, token: FormatToken, node: Node) -> bool: """Determine if a node can be represented in hex format.""" - if token[-1] != 's': + if token.conv_char != 's': return True if node.type == 'identifier' and node.text.decode('utf-8') in FUNC_MACROS: @@ -502,7 +497,7 @@ class LogCompressor: if log_info['hexify']: # Count of arguments that are not special (__func__, __LINE__, etc.) - arg_tokens = [t for t in log_info['argu_tokens'] if isinstance(t, tuple)] + arg_tokens = [t for t in log_info['argu_tokens'] if isinstance(t, FormatToken)] arg_count = len(arg_tokens) arguments = [] sizes = [] @@ -515,23 +510,23 @@ class LogCompressor: [a[0] for a in log_info['arguments'][1:]], ): # Skip special tokens - if token[6] in ('@func', '@line'): + if token.conv_char in ('@func', '@line'): arg_count -= 1 continue # Handle hex function - if token[6].startswith('@hex_func'): + if token.conv_char.startswith('@hex_func'): if not hex_func: hex_func = [] - hex_func.append(token[6]) + hex_func.append(token.conv_char) arg_count -= 1 continue arguments.append(argument) - if token[6] == 'f' or token[5] == 'll': # float or long long + if token.conv_char == 'f' or token.length == 'll': # float or long long sizes.append(f'{int(ARG_SIZE_TYPE.U64)}') - elif token[6] == 's': + elif token.conv_char == 's': sizes.append(f'{int(ARG_SIZE_TYPE.STR)}') else: sizes.append(f'{int(ARG_SIZE_TYPE.U32)}') @@ -626,16 +621,16 @@ class LogCompressor: simple_fmt_list: list[str] = [] hex_buffer_cnt = 0 for token in log['argu_tokens']: - if isinstance(token, tuple): - if '@func' in token[6] or '@line' in token[6]: + if isinstance(token, FormatToken): + if '@func' in token.conv_char or '@line' in token.conv_char: continue - if '@hex_func' in token[6]: + if '@hex_func' in token.conv_char: simple_fmt_list.append(f'@hex_buffer{hex_buffer_cnt}') no_buf_fmt += f'@hex_buffer{hex_buffer_cnt}' hex_buffer_cnt += 1 continue - simple_fmt_list.append(token[2]) - no_buf_fmt += token[2] + simple_fmt_list.append(token.full_spec) + no_buf_fmt += token.full_spec else: no_buf_fmt += token simple_fmt_str = ' '.join(simple_fmt_list) if simple_fmt_list else None diff --git a/components/bt/common/ble_log/extension/log_compression/scripts/c_format_parse.py b/components/bt/common/ble_log/extension/log_compression/scripts/c_format_parse.py index 366a9133036..c0599db6408 100644 --- a/components/bt/common/ble_log/extension/log_compression/scripts/c_format_parse.py +++ b/components/bt/common/ble_log/extension/log_compression/scripts/c_format_parse.py @@ -8,10 +8,24 @@ Parses C-style format strings and handles argument formatting for log compressio """ import struct +from typing import NamedTuple from typing import Union -def parse_format_string(format_str: str) -> list[Union[str, tuple[int, int, str, str, str, str, str, str]]]: +class FormatToken(NamedTuple): + """Parsed C format specifier, e.g. ``%08llx`` -> FormatToken(start, end, '%08llx', '0', '8', '', 'll', 'x').""" + + start: int # index of '%' in the source string + end: int # index one-past the conversion char + full_spec: str # the raw specifier text, e.g. '%08llx' + flags: str # '-', '+', ' ', '#', '0', or '' + width: str # e.g. '10', '*', or '' + precision: str # e.g. '.2' content (without '.'), '*', or '' + length: str # 'h', 'hh', 'l', 'll', 'j', 'z', 't', or '' + conv_char: str # 'd', 'i', 'u', 'o', 'x', 'X', 'f', 's', etc. + + +def parse_format_string(format_str: str) -> list[Union[str, FormatToken]]: """ Parse a format string into tokens. @@ -19,10 +33,9 @@ def parse_format_string(format_str: str) -> list[Union[str, tuple[int, int, str, format_str: C-style format string Returns: - List of tokens (strings or format spec tuples) - Tuple format: (start, end, full_spec, flags, width, precision, length, conv_char) + List of tokens (literal strings or FormatToken named-tuples) """ - tokens: list[Union[str, tuple[int, int, str, str, str, str, str, str]]] = [] + tokens: list[Union[str, FormatToken]] = [] i = 0 n = len(format_str) @@ -85,7 +98,7 @@ def parse_format_string(format_str: str) -> list[Union[str, tuple[int, int, str, conv_char = format_str[i] i += 1 full_spec = format_str[start:i] - tokens.append((start, i, full_spec, flags, width, precision, length, conv_char)) + tokens.append(FormatToken(start, i, full_spec, flags, width, precision, length, conv_char)) else: # Invalid format spec, treat as literal text tokens.append(format_str[start:i]) @@ -216,10 +229,8 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str: arg_index = 0 for token in tokens: - if isinstance(token, tuple): - start, end, full_spec, flags, width, precision, length_mod, conv_char = token - - if conv_char == '%': + if isinstance(token, FormatToken): + if token.conv_char == '%': output.append('%') else: if arg_index >= len(args): @@ -229,19 +240,19 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str: arg_index += 1 # Character type - if conv_char == 'c': + if token.conv_char == 'c': # Pad to 4 bytes for unpacking padded = arg_bytes.ljust(4, b'\x00') char_code = struct.unpack('>I', padded)[0] output.append(chr(char_code)) # Pointer type - elif conv_char == 'p': + elif token.conv_char == 'p': ptr_value = int.from_bytes(arg_bytes, 'big', signed=False) output.append(hex(ptr_value)) # Floating point types - elif conv_char in 'fFeEgGaA': + elif token.conv_char in 'fFeEgGaA': if len(arg_bytes) == 4: float_value = struct.unpack('>f', arg_bytes)[0] elif len(arg_bytes) == 8: @@ -251,13 +262,13 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str: output.append(str(float_value)) # Integer types - elif conv_char in 'diuoxX': - signed = conv_char in 'di' + elif token.conv_char in 'diuoxX': + signed = token.conv_char in 'di' # Determine expected size - if length_mod == 'll': + if token.length == 'll': expected_size = 8 - elif length_mod in ('l', 'z', 'j', 't') or conv_char == 'p': + elif token.length in ('l', 'z', 'j', 't') or token.conv_char == 'p': expected_size = 4 else: expected_size = len(arg_bytes) @@ -275,9 +286,9 @@ def parse_compressed_arguments(byte_sequence: bytes, format_str: str) -> str: # Convert to integer int_value = int.from_bytes(arg_bytes, 'big', signed=signed) - output.append(format_integer(int_value, conv_char, flags, width, length_mod)) + output.append(format_integer(int_value, token.conv_char, token.flags, token.width, token.length)) else: - raise ValueError(f'Unsupported conversion: {conv_char}') + raise ValueError(f'Unsupported conversion: {token.conv_char}') else: output.append(token)