Upstream: https://bugs.launchpad.net/brz/+bug/1710979 Upstream: https://sources.debian.org/patches/bzr/2.7.0+bzr6619-7+deb9u1/27_CVE-2017-14176/ Description: Prevent SSH command line options from being specified in bzr+ssh:// URLs Bug: https://bugs.launchpad.net/brz/+bug/1710979 Bug-Debian: https://bugs.debian.org/874429 Bug-Debian-Security: https://security-tracker.debian.org/tracker/CVE-2017-14176 Forwarded: no Author: Jelmer Vernooij Last-Update: 2017-11-26 === modified file 'bzrlib/tests/test_ssh_transport.py' --- bzrlib/tests/test_ssh_transport.py 2010-10-07 12:45:51 +0000 +++ bzrlib/tests/test_ssh_transport.py 2017-08-20 01:59:20 +0000 @@ -22,6 +22,7 @@ SSHCorpSubprocessVendor, LSHSubprocessVendor, SSHVendorManager, + StrangeHostname, ) @@ -161,6 +162,19 @@ class SubprocessVendorsTests(TestCase): + def test_openssh_command_tricked(self): + vendor = OpenSSHSubprocessVendor() + self.assertEqual( + vendor._get_vendor_specific_argv( + "user", "-oProxyCommand=blah", 100, command=["bzr"]), + ["ssh", "-oForwardX11=no", "-oForwardAgent=no", + "-oClearAllForwardings=yes", + "-oNoHostAuthenticationForLocalhost=yes", + "-p", "100", + "-l", "user", + "--", + "-oProxyCommand=blah", "bzr"]) + def test_openssh_command_arguments(self): vendor = OpenSSHSubprocessVendor() self.assertEqual( @@ -171,6 +185,7 @@ "-oNoHostAuthenticationForLocalhost=yes", "-p", "100", "-l", "user", + "--", "host", "bzr"] ) @@ -184,9 +199,16 @@ "-oNoHostAuthenticationForLocalhost=yes", "-p", "100", "-l", "user", - "-s", "host", "sftp"] + "-s", "--", "host", "sftp"] ) + def test_openssh_command_tricked(self): + vendor = SSHCorpSubprocessVendor() + self.assertRaises( + StrangeHostname, + vendor._get_vendor_specific_argv, + "user", "-oProxyCommand=host", 100, command=["bzr"]) + def test_sshcorp_command_arguments(self): vendor = SSHCorpSubprocessVendor() self.assertEqual( @@ -209,6 +231,13 @@ "-s", "sftp", "host"] ) + def test_lsh_command_tricked(self): + vendor = LSHSubprocessVendor() + self.assertRaises( + StrangeHostname, + vendor._get_vendor_specific_argv, + "user", "-oProxyCommand=host", 100, command=["bzr"]) + def test_lsh_command_arguments(self): vendor = LSHSubprocessVendor() self.assertEqual( @@ -231,6 +260,13 @@ "--subsystem", "sftp", "host"] ) + def test_plink_command_tricked(self): + vendor = PLinkSubprocessVendor() + self.assertRaises( + StrangeHostname, + vendor._get_vendor_specific_argv, + "user", "-oProxyCommand=host", 100, command=["bzr"]) + def test_plink_command_arguments(self): vendor = PLinkSubprocessVendor() self.assertEqual( === modified file 'bzrlib/transport/ssh.py' --- bzrlib/transport/ssh.py 2015-07-31 01:04:41 +0000 +++ bzrlib/transport/ssh.py 2017-08-20 01:59:20 +0000 @@ -46,6 +46,10 @@ from paramiko.sftp_client import SFTPClient +class StrangeHostname(errors.BzrError): + _fmt = "Refusing to connect to strange SSH hostname %(hostname)s" + + SYSTEM_HOSTKEYS = {} BZR_HOSTKEYS = {} @@ -360,6 +364,11 @@ # tests, but beware of using PIPE which may hang due to not being read. _stderr_target = None + @staticmethod + def _check_hostname(arg): + if arg.startswith('-'): + raise StrangeHostname(hostname=arg) + def _connect(self, argv): # Attempt to make a socketpair to use as stdin/stdout for the SSH # subprocess. We prefer sockets to pipes because they support @@ -424,9 +433,9 @@ if username is not None: args.extend(['-l', username]) if subsystem is not None: - args.extend(['-s', host, subsystem]) + args.extend(['-s', '--', host, subsystem]) else: - args.extend([host] + command) + args.extend(['--', host] + command) return args register_ssh_vendor('openssh', OpenSSHSubprocessVendor()) @@ -439,6 +448,7 @@ def _get_vendor_specific_argv(self, username, host, port, subsystem=None, command=None): + self._check_hostname(host) args = [self.executable_path, '-x'] if port is not None: args.extend(['-p', str(port)]) @@ -460,6 +470,7 @@ def _get_vendor_specific_argv(self, username, host, port, subsystem=None, command=None): + self._check_hostname(host) args = [self.executable_path] if port is not None: args.extend(['-p', str(port)]) @@ -481,6 +492,7 @@ def _get_vendor_specific_argv(self, username, host, port, subsystem=None, command=None): + self._check_hostname(host) args = [self.executable_path, '-x', '-a', '-ssh', '-2', '-batch'] if port is not None: args.extend(['-P', str(port)])