CVE-2024-53865
Description
zhmcclient is a pure Python client library for the IBM Z HMC Web Services API. In affected versions the Python package "zhmcclient" writes password-like properties in clear text into its HMC and API logs in the following cases: 1. The 'boot-ftp-password' and 'ssc-master-pw' properties when creating or updating a partition in DPM mode, in the zhmcclient API and HMC logs. 2. The 'ssc-master-pw' and 'zaware-master-pw' properties when updating an LPAR in classic mode, in the zhmcclient API and HMC logs. 3. The 'ssc-master-pw' and 'zaware-master-pw' properties when creating or updating an image activation profile in classic mode, in the zhmcclient API and HMC logs. 4. The 'password' property when creating or updating an HMC user, in the zhmcclient API log. 5. The 'bind-password' property when creating or updating an LDAP server definition, in the zhmcclient API and HMC logs. This issue affects only users of the zhmcclient package that have enabled the Python loggers named "zhmcclient.api" (for the API log) or "zhmcclient.hmc" (for the HMC log) and that use the functions listed above. This issue has been fixed in zhmcclient version 1.18.1. Users are advised to upgrade. There are no known workarounds for this vulnerability.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
zhmcclientPyPI | < 1.18.1 | 1.18.1 |
Patches
2791d4bb00c01ad32781e782dFixed clear-text logging of all password-like properties in API/HMC log
16 files changed · +455 −136
changes/noissue.1.feature.rst+3 −0 added@@ -0,0 +1,3 @@ +Dev: Enhanced the zhmcclient API logging code so that in the debugger, +zhmcclient API functions now have less logging steps to go through until the +actual API function is reached.
changes/noissue.4.fix.rst+4 −0 added@@ -0,0 +1,4 @@ +Fixed that all password-like properties are no longer written in clear text to +the Python loggers "zhmcclient.api" and "zhmcclient.hmc", but are now blanked +out. Previously, that was done only for the "zhmcclient.hmc" logger for creation +and update of HMC users.
tests/unit/zhmcclient/test_activation_profile.py+18 −2 modified@@ -19,9 +19,10 @@ import copy import re +import logging import pytest -from zhmcclient import Client, ActivationProfile +from zhmcclient import Client, ActivationProfile, BLANKED_OUT_STRING from zhmcclient_mock import FakedSession from tests.common.utils import assert_resources @@ -338,11 +339,15 @@ def test_profile_repr(self): }}, {'group-profile-uri': None}, {'zaware-gateway-info': None}, + {'ssc-master-pw': 'bla', 'zaware-master-pw': 'bla'}, ] ) - def test_profile_update_properties(self, input_props, profile_type): + def test_profile_update_properties(self, caplog, input_props, profile_type): """Test ActivationProfile.update_properties().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + mgr_attr = profile_type + '_activation_profiles' profile_mgr = getattr(self.cpc, mgr_attr) @@ -354,6 +359,9 @@ def test_profile_update_properties(self, input_props, profile_type): # Execute the code to be tested profile.update_properties(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Verify that the resource object already reflects the property # updates. for prop_name in saved_properties: @@ -376,3 +384,11 @@ def test_profile_update_properties(self, input_props, profile_type): assert prop_name in profile.properties prop_value = profile.properties[prop_name] assert prop_value == exp_prop_value + + # Verify the API call log record for blanked-out properties. + if 'ssc-master-pw' in input_props: + exp_str = f"'ssc-master-pw': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + if 'zaware-master-pw' in input_props: + exp_str = f"'zaware-master-pw': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0
tests/unit/zhmcclient/test_ldap_server_definition.py+34 −3 modified@@ -19,9 +19,11 @@ import re import copy +import logging import pytest -from zhmcclient import Client, HTTPError, NotFound, LdapServerDefinition +from zhmcclient import Client, HTTPError, NotFound, LdapServerDefinition, \ + BLANKED_OUT_STRING from zhmcclient_mock import FakedSession from tests.common.utils import assert_resources @@ -149,12 +151,21 @@ def test_ldap_srv_def_manager_list( 'search-distinguished-name': 'test{0}'}, ['element-uri', 'name', 'description'], None), + ({'name': 'a', + 'primary-hostname-ipaddr': '10.11.12.13', + 'search-distinguished-name': 'test{0}', + 'bind-password': 'bla'}, + ['element-uri', 'name', 'bind-password'], + None), ] ) def test_ldap_srv_def_manager_create( - self, input_props, exp_prop_names, exp_exc): + self, caplog, input_props, exp_prop_names, exp_exc): """Test LdapServerDefinitionManager.create().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + ldap_srv_def_mgr = self.console.ldap_server_definitions if exp_exc is not None: @@ -174,6 +185,9 @@ def test_ldap_srv_def_manager_create( # Execute the code to be tested. ldap_srv_def = ldap_srv_def_mgr.create(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Check the resource for consistency within itself assert isinstance(ldap_srv_def, LdapServerDefinition) ldap_srv_def_name = ldap_srv_def.name @@ -191,6 +205,11 @@ def test_ldap_srv_def_manager_create( exp_value = input_props[prop_name] assert value == exp_value + # Verify the API call log record for blanked-out properties. + if 'bind-password' in input_props: + exp_str = f"'bind-password': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + def test_ldap_srv_def_repr(self): """Test LdapServerDefinition.__repr__().""" @@ -287,11 +306,15 @@ def test_ldap_delete_create_same(self): "input_props", [ {}, {'description': 'New LDAP Server Definition description'}, + {'bind-password': 'bla'}, ] ) - def test_ldap_srv_def_update_properties(self, input_props): + def test_ldap_srv_def_update_properties(self, caplog, input_props): """Test LdapServerDefinition.update_properties().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + ldap_srv_def_name = 'faked_a' # Add the LDAP Server Definition to be tested @@ -306,6 +329,9 @@ def test_ldap_srv_def_update_properties(self, input_props): # Execute the code to be tested ldap_srv_def.update_properties(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Verify that the resource object already reflects the property # updates. for prop_name in saved_properties: @@ -329,3 +355,8 @@ def test_ldap_srv_def_update_properties(self, input_props): assert prop_name in ldap_srv_def.properties prop_value = ldap_srv_def.properties[prop_name] assert prop_value == exp_prop_value + + # Verify the API call log record for blanked-out properties. + if 'bind-password' in input_props: + exp_str = f"'bind-password': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0
tests/unit/zhmcclient/test_logging.py+78 −18 modified@@ -23,18 +23,26 @@ from testfixtures import LogCapture from zhmcclient._logging import logged_api_call, get_logger - +from zhmcclient._constants import BLANKED_OUT_STRING # # Various uses of the @logged_api_call decorator # + @logged_api_call def decorated_global_function(): """A decorated function at the global (module) level.""" pass +@logged_api_call(blanked_properties=['hideme'], properties_pos=0) +def decorated_global_props_function(properties): + """A decorated function with properties at the global (module) level, + where the 'hideme' property is blanked out in the API log.""" + return properties + + def global1_function(): """An undecorated function at the global (module) level.""" @@ -136,8 +144,8 @@ def call_from_global(func, *args, **kwargs): # Some expected values that are constant _EXP_LOGGER_NAME = 'zhmcclient.api' _EXP_LOG_LEVEL = 'DEBUG' -_EXP_LOG_MSG_ENTER_PATTERN = "Called: .*, args: .*, kwargs: .*" -_EXP_LOG_MSG_LEAVE_PATTERN = "Return: .*, result: .*" +_EXP_LOG_MSG_ENTER_PATTERN = "Called: (.*), args: (.*), kwargs: (.*)" +_EXP_LOG_MSG_LEAVE_PATTERN = "Return: (.*), result: (.*)" @pytest.fixture() @@ -155,9 +163,9 @@ def capture(): # Test cases # -def assert_log_capture(log_capture, exp_apifunc): - # pylint: disable=unused-argument - # Note: exp_apifunc is shown when pytest displays a traceback. +def assert_log_capture( + log_capture, *, func_pattern=None, args_pattern=None, + kwargs_pattern=None, return_pattern=None): """ Assert that the log capture is as expected. """ @@ -167,13 +175,23 @@ def assert_log_capture(log_capture, exp_apifunc): assert enter_record.name == _EXP_LOGGER_NAME assert enter_record.levelname == _EXP_LOG_LEVEL assert re.match(_EXP_LOG_MSG_ENTER_PATTERN, enter_record.msg) - # We don't check the function name and its pos and kw args + func_str, args_str, kwargs_str = enter_record.args + if func_pattern: + assert re.search(func_pattern, func_str) + if args_pattern: + assert re.search(args_pattern, args_str) + if kwargs_pattern: + assert re.search(kwargs_pattern, kwargs_str) leave_record = log_capture.records[1] assert leave_record.name == _EXP_LOGGER_NAME assert leave_record.levelname == _EXP_LOG_LEVEL assert re.match(_EXP_LOG_MSG_LEAVE_PATTERN, leave_record.msg) - # We don't check the function name and its pos and kw args + func_str, return_str = leave_record.args + if func_pattern: + assert re.search(func_pattern, func_str) + if return_pattern: + assert re.search(return_pattern, return_str) def test_1a_global_from_global(capture): @@ -183,7 +201,8 @@ def test_1a_global_from_global(capture): call_from_global(decorated_global_function) - assert_log_capture(capture, 'decorated_global_function()') + assert_log_capture( + capture, func_pattern='decorated_global_function') def test_1b_global_from_method(capture): @@ -192,7 +211,40 @@ def test_1b_global_from_method(capture): CallerClass().call_from_method(decorated_global_function) - assert_log_capture(capture, 'decorated_global_function()') + assert_log_capture( + capture, func_pattern='decorated_global_function') + + +def test_1c_global_props_args_from_global(capture): + # pylint: disable=redefined-outer-name + """Simple test calling a decorated global function with properties as args, + from a global function.""" + props = { + 'prop1': 'value1', + 'hideme': 'secret', + } + blanked_props = { + 'prop1': 'value1', + 'hideme': BLANKED_OUT_STRING, + } + call_from_global(decorated_global_props_function, props) + + assert_log_capture( + capture, func_pattern='decorated_global_props_function', + args_pattern=re.escape(str(blanked_props)), + return_pattern=re.escape(str(props))) + + +def test_1c_global_props_kwargs_from_global(capture): + # pylint: disable=redefined-outer-name + """Simple test calling a decorated global function with properties as + kwargs, from a global function.""" + + call_from_global(decorated_global_props_function, + properties={'prop1': 'value1'}) + + assert_log_capture( + capture, func_pattern='decorated_global_props_function') def test_2a_global_inner1_from_global(capture): @@ -204,7 +256,8 @@ def test_2a_global_inner1_from_global(capture): call_from_global(decorated_inner1_function) - assert_log_capture(capture, 'global1_function.decorated_inner1_function()') + assert_log_capture( + capture, func_pattern='global1_function.decorated_inner1_function') def test_2b_global_inner1_from_method(capture): @@ -216,7 +269,8 @@ def test_2b_global_inner1_from_method(capture): CallerClass().call_from_method(decorated_inner1_function) - assert_log_capture(capture, 'global1_function.decorated_inner1_function()') + assert_log_capture( + capture, func_pattern='global1_function.decorated_inner1_function') def test_3a_global_inner2_from_global(capture): @@ -228,7 +282,8 @@ def test_3a_global_inner2_from_global(capture): call_from_global(decorated_inner2_function) - assert_log_capture(capture, 'inner1_function.decorated_inner2_function()') + assert_log_capture( + capture, func_pattern='inner1_function.decorated_inner2_function') def test_3b_global_inner1_from_method(capture): @@ -240,7 +295,8 @@ def test_3b_global_inner1_from_method(capture): CallerClass().call_from_method(decorated_inner2_function) - assert_log_capture(capture, 'inner1_function.decorated_inner2_function()') + assert_log_capture( + capture, func_pattern='inner1_function.decorated_inner2_function') def test_4a_method_from_global(capture): @@ -252,7 +308,8 @@ def test_4a_method_from_global(capture): call_from_global(decorated_method, d) - assert_log_capture(capture, 'Decorator1Class.decorated_method()') + assert_log_capture( + capture, func_pattern='Decorator1Class.decorated_method') def test_4b_method_from_method(capture): @@ -264,7 +321,8 @@ def test_4b_method_from_method(capture): CallerClass().call_from_method(decorated_method, d) - assert_log_capture(capture, 'Decorator1Class.decorated_method()') + assert_log_capture( + capture, func_pattern='Decorator1Class.decorated_method') def test_5a_method_from_global(capture): @@ -277,7 +335,8 @@ def test_5a_method_from_global(capture): call_from_global(decorated_inner_function) - assert_log_capture(capture, 'method.decorated_inner_function()') + assert_log_capture( + capture, func_pattern='method.decorated_inner_function') def test_5b_method_from_method(capture): @@ -290,7 +349,8 @@ def test_5b_method_from_method(capture): CallerClass().call_from_method(decorated_inner_function) - assert_log_capture(capture, 'method.decorated_inner_function()') + assert_log_capture( + capture, func_pattern='method.decorated_inner_function') def test_decorated_class():
tests/unit/zhmcclient/test_lpar.py+22 −4 modified@@ -19,11 +19,13 @@ import re import copy +import logging from unittest import mock import pytest import requests_mock -from zhmcclient import Client, Lpar, HTTPError, StatusTimeout, Job +from zhmcclient import Client, Lpar, HTTPError, StatusTimeout, Job, \ + BLANKED_OUT_STRING from zhmcclient_mock import FakedSession, LparActivateHandler, \ LparDeactivateHandler, LparLoadHandler from tests.common.utils import assert_resources @@ -319,13 +321,18 @@ def test_lpar_repr(self): {'description': 'New lpar description'}, {'acceptable-status': ['operating', 'not-operating'], 'description': 'New lpar description'}, - {'ssc-master-userid': None, - 'ssc-master-pw': None}, + {'ssc-master-userid': 'user', + 'ssc-master-pw': 'bla'}, + {'zaware-master-userid': 'user', + 'zaware-master-pw': 'bla'}, ] ) - def test_lpar_update_properties(self, input_props, lpar_name): + def test_lpar_update_properties(self, caplog, input_props, lpar_name): """Test Lpar.update_properties().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + # Add faked lpars self.add_lpar1() self.add_lpar2() @@ -339,6 +346,9 @@ def test_lpar_update_properties(self, input_props, lpar_name): # Execute the code to be tested lpar.update_properties(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Verify that the resource object already reflects the property # updates. for prop_name in saved_properties: @@ -362,6 +372,14 @@ def test_lpar_update_properties(self, input_props, lpar_name): prop_value = lpar.properties[prop_name] assert prop_value == exp_prop_value + # Verify the API call log record for blanked-out properties. + if 'ssc-master-pw' in input_props: + exp_str = f"'ssc-master-pw': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + if 'zaware-master-pw' in input_props: + exp_str = f"'zaware-master-pw': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + @pytest.mark.parametrize( "initial_profile, profile_kwargs, exp_profile, exp_profile_exc", [ ('', {},
tests/unit/zhmcclient/test_partition.py+55 −7 modified@@ -19,9 +19,11 @@ import re import copy +import logging import pytest -from zhmcclient import Client, Partition, HTTPError, NotFound +from zhmcclient import Client, Partition, HTTPError, NotFound, \ + BLANKED_OUT_STRING from zhmcclient_mock import FakedSession from tests.common.utils import assert_resources @@ -307,23 +309,43 @@ def test_pm_list_add_props( ({'name': 'fake-part-x', 'ifl-processors': 2, 'initial-memory': 4096, - 'maximum-memory': 4096}, + 'maximum-memory': 4096, + 'description': 'fake description X'}, ['object-uri', 'name', 'initial-memory', 'maximum-memory', - 'ifl-processors'], + 'ifl-processors', 'description'], None), ({'name': 'fake-part-x', 'ifl-processors': 2, 'initial-memory': 4096, 'maximum-memory': 4096, - 'description': 'fake description X'}, + 'boot-device': 'ftp', + 'boot-ftp-host': 'host', + 'boot-ftp-username': 'user', + 'boot-ftp-password': 'bla', + 'boot-ftp-insfile': 'ins'}, ['object-uri', 'name', 'initial-memory', 'maximum-memory', - 'ifl-processors', 'description'], + 'ifl-processors', 'boot-device', 'boot-ftp-host', + 'boot-ftp-username', 'boot-ftp-insfile'], + None), + ({'name': 'fake-part-x', + 'ifl-processors': 2, + 'initial-memory': 4096, + 'maximum-memory': 4096, + 'type': 'ssc', + 'ssc-host-name': 'host', + 'ssc-master-userid': 'user', + 'ssc-master-pw': 'bla'}, + ['object-uri', 'name', 'initial-memory', 'maximum-memory', + 'ifl-processors', 'type', 'ssc-host-name', 'ssc-master-userid'], None), ] ) - def test_pm_create(self, input_props, exp_prop_names, exp_exc): + def test_pm_create(self, caplog, input_props, exp_prop_names, exp_exc): """Test PartitionManager.create().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + partition_mgr = self.cpc.partitions if exp_exc is not None: @@ -345,6 +367,9 @@ def test_pm_create(self, input_props, exp_prop_names, exp_exc): # the input properties plus 'object-uri'. partition = partition_mgr.create(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Check the resource for consistency within itself assert isinstance(partition, Partition) partition_name = partition.name @@ -362,6 +387,14 @@ def test_pm_create(self, input_props, exp_prop_names, exp_exc): exp_value = input_props[prop_name] assert value == exp_value + # Verify the API call log record for blanked-out properties. + if 'boot-ftp-password' in input_props: + exp_str = f"'boot-ftp-password': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + if 'ssc-master-pw' in input_props: + exp_str = f"'ssc-master-pw': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + def test_pm_resource_object(self): """ Test PartitionManager.resource_object(). @@ -673,9 +706,13 @@ def test_partition_feature_info( 'ssc-master-pw': None}, ] ) - def test_partition_update_properties(self, input_props, partition_name): + def test_partition_update_properties( + self, caplog, input_props, partition_name): """Test Partition.update_properties().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + # Add faked partitions self.add_partition1() self.add_partition2() @@ -689,6 +726,9 @@ def test_partition_update_properties(self, input_props, partition_name): # Execute the code to be tested partition.update_properties(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Verify that the resource object already reflects the property # updates. for prop_name in saved_properties: @@ -712,6 +752,14 @@ def test_partition_update_properties(self, input_props, partition_name): prop_value = partition.properties[prop_name] assert prop_value == exp_prop_value + # Verify the API call log record for blanked-out properties. + if 'boot-ftp-password' in input_props: + exp_str = f"'boot-ftp-password': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + if 'ssc-master-pw' in input_props: + exp_str = f"'ssc-master-pw': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + def test_partition_update_name(self): """ Test Partition.update_properties() with 'name' property.
tests/unit/zhmcclient/test_user.py+15 −2 modified@@ -19,9 +19,10 @@ import re import copy +import logging import pytest -from zhmcclient import Client, HTTPError, NotFound, User +from zhmcclient import Client, HTTPError, NotFound, User, BLANKED_OUT_STRING from zhmcclient_mock import FakedSession from tests.common.utils import assert_resources @@ -180,9 +181,13 @@ def test_user_manager_list( None), ] ) - def test_user_manager_create(self, input_props, exp_prop_names, exp_exc): + def test_user_manager_create( + self, caplog, input_props, exp_prop_names, exp_exc): """Test UserManager.create().""" + logger_name = "zhmcclient.api" + caplog.set_level(logging.DEBUG, logger=logger_name) + user_mgr = self.console.users if exp_exc is not None: @@ -202,6 +207,9 @@ def test_user_manager_create(self, input_props, exp_prop_names, exp_exc): # Execute the code to be tested. user = user_mgr.create(properties=input_props) + # Get its API call log record + call_record = caplog.records[-2] + # Check the resource for consistency within itself assert isinstance(user, User) user_name = user.name @@ -219,6 +227,11 @@ def test_user_manager_create(self, input_props, exp_prop_names, exp_exc): exp_value = input_props[prop_name] assert value == exp_value + # Verify the API call log record for blanked-out properties. + if 'password' in input_props: + exp_str = f"'password': '{BLANKED_OUT_STRING}'" + assert call_record.message.find(exp_str) > 0 + def test_user_repr(self): """Test User.__repr__()."""
zhmcclient/_activation_profile.py+4 −2 modified@@ -224,7 +224,8 @@ def list(self, full_properties=False, filter_args=None, list_uri, result_prop, full_properties, filter_args, additional_properties) - @logged_api_call + @logged_api_call(blanked_properties=['ssc-master-pw', 'zaware-master-pw'], + properties_pos=1) def create(self, properties): """ Create and configure an Activation Profiles on this CPC, of the profile @@ -344,7 +345,8 @@ def delete(self): self.get_properties_local(self.manager._name_prop, None)) self.cease_existence_local() - @logged_api_call + @logged_api_call(blanked_properties=['ssc-master-pw', 'zaware-master-pw'], + properties_pos=1) def update_properties(self, properties): """ Update writeable properties of this Activation Profile.
zhmcclient/_constants.py+6 −1 modified@@ -50,7 +50,8 @@ 'HTML_REASON_WEB_SERVICES_DISABLED', 'HTML_REASON_OTHER', 'STOMP_MIN_CONNECTION_CHECK_TIME', - 'DEFAULT_WS_TIMEOUT'] + 'DEFAULT_WS_TIMEOUT', + 'BLANKED_OUT_STRING'] #: Default HTTP connect timeout in seconds, @@ -187,3 +188,7 @@ #: Default WebSocket connect and receive timeout in seconds, for interacting #: with the :class:`zhmcclient.OSConsole` class. DEFAULT_WS_TIMEOUT = 5 + +#: Replacement string for blanked out sensitive values in log entries, such as +#: passwords or session tokens. +BLANKED_OUT_STRING = '********'
zhmcclient/_ldap_server_definition.py+2 −2 modified@@ -151,7 +151,7 @@ def list(self, full_properties=False, filter_args=None): return self._list_with_operation( list_uri, result_prop, full_properties, filter_args, None) - @logged_api_call + @logged_api_call(blanked_properties=['bind-password'], properties_pos=1) def create(self, properties): """ Create a new LDAP Server Definition in this HMC. @@ -257,7 +257,7 @@ def delete(self): self.get_properties_local(self.manager._name_prop, None)) self.cease_existence_local() - @logged_api_call + @logged_api_call(blanked_properties=['bind-password'], properties_pos=1) def update_properties(self, properties): """ Update writeable properties of this LDAP Server Definitions.
zhmcclient/_logging.py+187 −81 modified@@ -75,9 +75,10 @@ import logging import inspect -from decorator import decorate # requires decorator>=4.0 +import functools +from collections.abc import Mapping, Sequence -from ._constants import API_LOGGER_NAME +from ._constants import API_LOGGER_NAME, BLANKED_OUT_STRING __all__ = [] @@ -105,7 +106,8 @@ def get_logger(name): return logger -def logged_api_call(func): +def logged_api_call( + org_func=None, *, blanked_properties=None, properties_pos=None): """ Function decorator that causes the decorated API function or method to log calls to itself to a logger. @@ -115,7 +117,24 @@ def logged_api_call(func): Parameters: - func (function object): The original function being decorated. + org_func (function object): The original function being decorated. + Will be `None` if the decorator is specified with its optional + argument 'blanked_properties'. + + blanked_properties (list of str): Optional: List of properties in the + 'properties' argument of the decorated API function that should be + blanked out before being logged. Can be used to hide password + properties. + This parameter is required when 'properties_pos' is used. + This parameter must be specified as a keyword argument. + + properties_pos (int): Optional: 0-based index of the 'properties' + parameter in the argument list of the decorated API function. + For methods, the 'self' or 'cls' parameter is included in the position. + This parameter is needed in case the properties are passed as a + positional argument by the caller of the API function. + This parameter is required when 'blanked_properties' is used. + This parameter must be specified as a keyword argument. Returns: @@ -128,109 +147,196 @@ def logged_api_call(func): method (and not on top of the @property decorator). """ - # Note that in this decorator function, we are in a module loading context, - # where the decorated functions are being defined. When this decorator - # function is called, its call stack represents the definition of the - # decorated functions. Not all global definitions in the module have been - # defined yet, and methods of classes that are decorated with this - # decorator are still functions at this point (and not yet methods). + if blanked_properties is not None and properties_pos is None: + raise TypeError( + "If the @logged_api_call decorator is specified with " + "'blanked_properties', 'properties_pos' must also be specified.") - module = inspect.getmodule(func) - if not inspect.isfunction(func) or not hasattr(module, '__name__'): - raise TypeError("The @logged_api_call decorator must be used on a " - "function or method (and not on top of the @property " - "decorator)") + if properties_pos is not None and blanked_properties is None: + raise TypeError( + "If the @logged_api_call decorator is specified with " + "'properties_pos', 'blanked_properties' must also be specified.") - try: - # We avoid the use of inspect.getouterframes() because it is slow, - # and use the pointers up the stack frame, instead. + if blanked_properties is not None and ( + not isinstance(blanked_properties, Sequence) or # noqa: W504 + isinstance(blanked_properties, str)): + raise TypeError( + "The 'blanked_properties' parameter of the @logged_api_call " + "decorator must be a list of strings.") - this_frame = inspect.currentframe() # this decorator function here - apifunc_frame = this_frame.f_back # the decorated API function - - apifunc_owner = inspect.getframeinfo(apifunc_frame)[2] + def _decorate(func): + """ + The actual decorator function that always gets the original decorated + function, independent of whether the 'logged_api_call' decorator was + specified with or without its optional arguments. - finally: - # Recommended way to deal with frame objects to avoid ref cycles - del this_frame - del apifunc_frame + Parameters: - # TODO: For inner functions, show all outer levels instead of just one. + func (function object): The original function being decorated. + """ - if apifunc_owner == '<module>': - # The decorated API function is defined globally (at module level) - apifunc_str = f'{func.__name__}()' - else: - # The decorated API function is defined in a class or in a function - apifunc_str = f'{apifunc_owner}.{func.__name__}()' + # Note that in this decorator function, we are in a module loading + # context, where the decorated functions are being defined. When this + # decorator function is called, its call stack represents the + # definition of the decorated functions. Not all global definitions in + # the module have been defined yet, and methods of classes that are + # decorated with this decorator are still functions at this point (and + # not yet methods). - logger = get_logger(API_LOGGER_NAME) + if not inspect.isfunction(func): + raise TypeError("The @logged_api_call decorator must be used on a " + "function or method (and not on top of the " + "@property decorator)") - def is_external_call(): - """ - Return a boolean indicating whether the call to the decorated API - function is an external call (vs. b eing an internal call). - """ try: # We avoid the use of inspect.getouterframes() because it is slow, # and use the pointers up the stack frame, instead. - log_it_frame = inspect.currentframe() # this log_it() function - log_api_call_frame = log_it_frame.f_back # the log_api_call() func - apifunc_frame = log_api_call_frame.f_back # the decorated API func - apicaller_frame = apifunc_frame.f_back # caller of API function - apicaller_module = inspect.getmodule(apicaller_frame) - if apicaller_module is None: - apicaller_module_name = "<unknown>" - else: - apicaller_module_name = apicaller_module.__name__ + this_frame = inspect.currentframe() # this function + apifunc_frame = this_frame.f_back # the decorated API function + if org_func: + # In this case, there is one more decorator function nesting + apifunc_frame = apifunc_frame.f_back + apifunc_owner = inspect.getframeinfo(apifunc_frame)[2] + finally: # Recommended way to deal with frame objects to avoid ref cycles - del log_it_frame - del log_api_call_frame + del this_frame del apifunc_frame - del apicaller_frame - del apicaller_module - # Log only if the caller is not from the zhmcclient package - return apicaller_module_name.split('.')[0] != 'zhmcclient' + # TODO: For inner functions, show all outer levels instead of just one. + + func_name = getattr(func, '__name__', '<unknown>') + if apifunc_owner == '<module>': + # The decorated API function is defined globally (at module level) + apifunc_str = f'{func_name}()' + else: + # The decorated API function is defined in a class or in a function + apifunc_str = f'{apifunc_owner}.{func_name}()' + + logger = get_logger(API_LOGGER_NAME) + + def is_external_call(): + """ + Return a boolean indicating whether the call to the decorated API + function is made from outside of the zhmcclient package. + """ + try: + # We avoid the use of inspect.getouterframes() because it is + # slow, and use the pointers up the stack frame, instead. + + this_frame = inspect.currentframe() # this function + log_api_call_frame = this_frame.f_back # log_api_call() + apifunc_frame = log_api_call_frame.f_back # the decorated func + apicaller_frame = apifunc_frame.f_back # caller of API func + apicaller_module = inspect.getmodule(apicaller_frame) + if apicaller_module is None: + apicaller_module_name = "<unknown>" + else: + apicaller_module_name = apicaller_module.__name__ + finally: + # Recommended way to deal with frame objects to avoid ref + # cycles + del this_frame + del log_api_call_frame + del apifunc_frame + del apicaller_frame + del apicaller_module + + # Log only if the caller is not from the zhmcclient package + return apicaller_module_name.split('.')[0] != 'zhmcclient' + + def blanked_dict(properties): + """ + Return a copy of the properties dict, with blanked out values + according to the 'blanked_properties' and 'properties_pos' + arguments of the 'logged_api_call' decorator. + """ + # properties may also be a DictView (subclass of Mapping) + assert isinstance(properties, Mapping) + copied_properties = dict(properties) + for pn in blanked_properties: + try: + copied_properties[pn] = BLANKED_OUT_STRING + except KeyError: + pass + return copied_properties + + def blanked_args(args, kwargs): + """ + Return a copy of args and kwargs, whereby the 'properties' argument + has items blanked out according to the 'blanked_properties' and + 'properties_pos' arguments of the 'logged_api_call' decorator. + """ + logged_kwargs = dict(kwargs) + logged_args = list(args) + if blanked_properties is not None: + if 'properties' in kwargs: + logged_kwargs['properties'] = \ + blanked_dict(kwargs['properties']) + else: + logged_args[properties_pos] = \ + blanked_dict(args[properties_pos]) + return tuple(logged_args), logged_kwargs + + def log_call(args, kwargs): + """ + Log the call to the API function. + """ + logged_args, logged_kwargs = blanked_args(args, kwargs) + logger.debug("Called: %s, args: %.500s, kwargs: %.500s", + apifunc_str, + log_escaped(repr(logged_args)), + log_escaped(repr(logged_kwargs))) + + def log_return(result): + """ + Log the return from the API function. + """ + logger.debug("Return: %s, result: %.1000s", + apifunc_str, log_escaped(repr(result))) - def log_api_call(func, *args, **kwargs): - """ - Log entry to and exit from the decorated function, at the debug level. + @functools.wraps(func) + def log_api_call(*args, **kwargs): + """ + Log entry to and exit from the decorated function, at the debug + level. - Note that this wrapper function is called every time the decorated - function/method is called, but that the log message only needs to be - constructed when logging for this logger and for this log level is - turned on. Therefore, we do as much as possible in the decorator - function, plus we use %-formatting and lazy interpolation provided by - the log functions, in order to save resources in this function here. + Note that this wrapper function is called every time the decorated + function/method is called, but that the log message only needs to + be constructed when logging for this logger and for this log level + is turned on. Therefore, we do as much as possible in the decorator + function, plus we use %-formatting and lazy interpolation provided + by the log functions, in order to save resources in this function + here. - Parameters: + Parameters: - func (function object): The decorated function. + func (function object): The decorated function. - *args: Any positional arguments for the decorated function. + *args: Any positional arguments for the decorated function. - **kwargs: Any keyword arguments for the decorated function. - """ + **kwargs: Any keyword arguments for the decorated function. + """ - # Note that in this function, we are in the context where the - # decorated function is actually called. + # Note that in this function, we are in the context where the + # decorated function is actually called. + _log_it = is_external_call() and logger.isEnabledFor(logging.DEBUG) - _log_it = is_external_call() and logger.isEnabledFor(logging.DEBUG) + if _log_it: + log_call(args, kwargs) - if _log_it: - logger.debug("Called: %s, args: %.500s, kwargs: %.500s", - apifunc_str, log_escaped(repr(args)), - log_escaped(repr(kwargs))) + result = func(*args, **kwargs) # The zhmcclient function - result = func(*args, **kwargs) + if _log_it: + log_return(result) - if _log_it: - logger.debug("Return: %s, result: %.1000s", - apifunc_str, log_escaped(repr(result))) + return result - return result + return log_api_call - return decorate(func, log_api_call) + # When the logged_api_call decorator is specified with its optional + # arguments, org_func is None + if org_func: + return _decorate(org_func) + return _decorate
zhmcclient/_lpar.py+2 −1 modified@@ -187,7 +187,8 @@ def __init__(self, manager, uri, name=None, properties=None): f"got {type(manager)}") super().__init__(manager, uri, name, properties) - @logged_api_call + @logged_api_call(blanked_properties=['ssc-master-pw', 'zaware-master-pw'], + properties_pos=1) def update_properties(self, properties): """ Update writeable properties of this LPAR.
zhmcclient/_partition.py+4 −2 modified@@ -176,7 +176,8 @@ def list(self, full_properties=False, filter_args=None, list_uri, result_prop, full_properties, filter_args, additional_properties) - @logged_api_call + @logged_api_call(blanked_properties=['boot-ftp-password', 'ssc-master-pw'], + properties_pos=1) def create(self, properties): """ Create and configure a Partition in this CPC. @@ -591,7 +592,8 @@ def delete(self): self.get_properties_local(self.manager._name_prop, None)) self.cease_existence_local() - @logged_api_call + @logged_api_call(blanked_properties=['boot-ftp-password', 'ssc-master-pw'], + properties_pos=1) def update_properties(self, properties): """ Update writeable properties of this Partition.
zhmcclient/_session.py+19 −9 modified@@ -39,7 +39,7 @@ DEFAULT_OPERATION_TIMEOUT, DEFAULT_STATUS_TIMEOUT, \ DEFAULT_NAME_URI_CACHE_TIMETOLIVE, HMC_LOGGER_NAME, \ HTML_REASON_WEB_SERVICES_DISABLED, HTML_REASON_OTHER, \ - DEFAULT_HMC_PORT + DEFAULT_HMC_PORT, BLANKED_OUT_STRING from ._utils import repr_obj_id from ._version import __version__ @@ -54,7 +54,14 @@ 'Accept': '*/*' } -BLANKED_OUT = '********' # Replacement for blanked out sensitive values +# Properties whose values are always blanked out in the HMC log entries +BLANKED_OUT_PROPERTIES = [ + 'boot-ftp-password', # partition create/update + 'bind-password', # LDAP server def. create/update + 'ssc-master-pw', # image profile cr/upd, part. cr/upd, LPAR upd + 'password', # user create/update + 'zaware-master-pw', # image profile create/update, LPAR update +] def _handle_request_exc(exc, retry_timeout_config): @@ -263,7 +270,7 @@ def _headers_for_logging(headers): """ if headers and 'X-API-Session' in headers: headers = headers.copy() - headers['X-API-Session'] = BLANKED_OUT + headers['X-API-Session'] = BLANKED_OUT_STRING return headers @@ -465,7 +472,7 @@ def __repr__(self): f" _actual_host={self._actual_host!r},\n" f" _base_url={self._base_url!r},\n" f" _headers={headers!r},\n" - f" _session_id={BLANKED_OUT!r},\n" + f" _session_id={BLANKED_OUT_STRING!r},\n" f" _session={self._session!r}\n" f" _object_topic={self._object_topic!r}\n" f" _job_topic={self._job_topic!r}\n" @@ -960,8 +967,11 @@ def _log_http_request( # structured data such as a password or session IDs. pass else: - if 'password' in content_dict: - content_dict['password'] = BLANKED_OUT + for prop in BLANKED_OUT_PROPERTIES: + try: + content_dict[prop] = BLANKED_OUT_STRING + except KeyError: + pass content = dict2json(content_dict) trunc = 30000 if content_len > trunc: @@ -1029,11 +1039,11 @@ def _log_http_response( if 'request-headers' in content_dict: headers_dict = content_dict['request-headers'] if 'x-api-session' in headers_dict: - headers_dict['x-api-session'] = BLANKED_OUT + headers_dict['x-api-session'] = BLANKED_OUT_STRING if 'api-session' in content_dict: - content_dict['api-session'] = BLANKED_OUT + content_dict['api-session'] = BLANKED_OUT_STRING if 'session-credential' in content_dict: - content_dict['session-credential'] = BLANKED_OUT + content_dict['session-credential'] = BLANKED_OUT_STRING content = dict2json(content_dict) if status >= 400: content_label = 'content'
zhmcclient/_user.py+2 −2 modified@@ -149,7 +149,7 @@ def list(self, full_properties=False, filter_args=None): return self._list_with_operation( list_uri, result_prop, full_properties, filter_args, None) - @logged_api_call + @logged_api_call(blanked_properties=['password'], properties_pos=1) def create(self, properties): """ Create a new User in this HMC. @@ -256,7 +256,7 @@ def delete(self): self.get_properties_local(self.manager._name_prop, None)) self.cease_existence_local() - @logged_api_call + @logged_api_call(blanked_properties=['password'], properties_pos=1) def update_properties(self, properties): """ Update writeable properties of this User.
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
4News mentions
0No linked articles in our index yet.