Loading...
Note: File does not exist in v4.10.11.
1# SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
2
3from argparse import ArgumentParser
4from argparse import FileType
5import os
6import sys
7import tpm2
8from tpm2 import ProtocolError
9import unittest
10import logging
11import struct
12
13class SmokeTest(unittest.TestCase):
14 def setUp(self):
15 self.client = tpm2.Client()
16 self.root_key = self.client.create_root_key()
17
18 def tearDown(self):
19 self.client.flush_context(self.root_key)
20 self.client.close()
21
22 def test_seal_with_auth(self):
23 data = ('X' * 64).encode()
24 auth = ('A' * 15).encode()
25
26 blob = self.client.seal(self.root_key, data, auth, None)
27 result = self.client.unseal(self.root_key, blob, auth, None)
28 self.assertEqual(data, result)
29
30 def test_seal_with_policy(self):
31 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
32
33 data = ('X' * 64).encode()
34 auth = ('A' * 15).encode()
35 pcrs = [16]
36
37 try:
38 self.client.policy_pcr(handle, pcrs)
39 self.client.policy_password(handle)
40
41 policy_dig = self.client.get_policy_digest(handle)
42 finally:
43 self.client.flush_context(handle)
44
45 blob = self.client.seal(self.root_key, data, auth, policy_dig)
46
47 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
48
49 try:
50 self.client.policy_pcr(handle, pcrs)
51 self.client.policy_password(handle)
52
53 result = self.client.unseal(self.root_key, blob, auth, handle)
54 except:
55 self.client.flush_context(handle)
56 raise
57
58 self.assertEqual(data, result)
59
60 def test_unseal_with_wrong_auth(self):
61 data = ('X' * 64).encode()
62 auth = ('A' * 20).encode()
63 rc = 0
64
65 blob = self.client.seal(self.root_key, data, auth, None)
66 try:
67 result = self.client.unseal(self.root_key, blob,
68 auth[:-1] + 'B'.encode(), None)
69 except ProtocolError as e:
70 rc = e.rc
71
72 self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL)
73
74 def test_unseal_with_wrong_policy(self):
75 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
76
77 data = ('X' * 64).encode()
78 auth = ('A' * 17).encode()
79 pcrs = [16]
80
81 try:
82 self.client.policy_pcr(handle, pcrs)
83 self.client.policy_password(handle)
84
85 policy_dig = self.client.get_policy_digest(handle)
86 finally:
87 self.client.flush_context(handle)
88
89 blob = self.client.seal(self.root_key, data, auth, policy_dig)
90
91 # Extend first a PCR that is not part of the policy and try to unseal.
92 # This should succeed.
93
94 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
95 self.client.extend_pcr(1, ('X' * ds).encode())
96
97 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
98
99 try:
100 self.client.policy_pcr(handle, pcrs)
101 self.client.policy_password(handle)
102
103 result = self.client.unseal(self.root_key, blob, auth, handle)
104 except:
105 self.client.flush_context(handle)
106 raise
107
108 self.assertEqual(data, result)
109
110 # Then, extend a PCR that is part of the policy and try to unseal.
111 # This should fail.
112 self.client.extend_pcr(16, ('X' * ds).encode())
113
114 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
115
116 rc = 0
117
118 try:
119 self.client.policy_pcr(handle, pcrs)
120 self.client.policy_password(handle)
121
122 result = self.client.unseal(self.root_key, blob, auth, handle)
123 except ProtocolError as e:
124 rc = e.rc
125 self.client.flush_context(handle)
126 except:
127 self.client.flush_context(handle)
128 raise
129
130 self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL)
131
132 def test_seal_with_too_long_auth(self):
133 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
134 data = ('X' * 64).encode()
135 auth = ('A' * (ds + 1)).encode()
136
137 rc = 0
138 try:
139 blob = self.client.seal(self.root_key, data, auth, None)
140 except ProtocolError as e:
141 rc = e.rc
142
143 self.assertEqual(rc, tpm2.TPM2_RC_SIZE)
144
145 def test_too_short_cmd(self):
146 rejected = False
147 try:
148 fmt = '>HIII'
149 cmd = struct.pack(fmt,
150 tpm2.TPM2_ST_NO_SESSIONS,
151 struct.calcsize(fmt) + 1,
152 tpm2.TPM2_CC_FLUSH_CONTEXT,
153 0xDEADBEEF)
154
155 self.client.send_cmd(cmd)
156 except IOError as e:
157 rejected = True
158 except:
159 pass
160 self.assertEqual(rejected, True)
161
162 def test_read_partial_resp(self):
163 try:
164 fmt = '>HIIH'
165 cmd = struct.pack(fmt,
166 tpm2.TPM2_ST_NO_SESSIONS,
167 struct.calcsize(fmt),
168 tpm2.TPM2_CC_GET_RANDOM,
169 0x20)
170 self.client.tpm.write(cmd)
171 hdr = self.client.tpm.read(10)
172 sz = struct.unpack('>I', hdr[2:6])[0]
173 rsp = self.client.tpm.read()
174 except:
175 pass
176 self.assertEqual(sz, 10 + 2 + 32)
177 self.assertEqual(len(rsp), 2 + 32)
178
179 def test_read_partial_overwrite(self):
180 try:
181 fmt = '>HIIH'
182 cmd = struct.pack(fmt,
183 tpm2.TPM2_ST_NO_SESSIONS,
184 struct.calcsize(fmt),
185 tpm2.TPM2_CC_GET_RANDOM,
186 0x20)
187 self.client.tpm.write(cmd)
188 # Read part of the respone
189 rsp1 = self.client.tpm.read(15)
190
191 # Send a new cmd
192 self.client.tpm.write(cmd)
193
194 # Read the whole respone
195 rsp2 = self.client.tpm.read()
196 except:
197 pass
198 self.assertEqual(len(rsp1), 15)
199 self.assertEqual(len(rsp2), 10 + 2 + 32)
200
201 def test_send_two_cmds(self):
202 rejected = False
203 try:
204 fmt = '>HIIH'
205 cmd = struct.pack(fmt,
206 tpm2.TPM2_ST_NO_SESSIONS,
207 struct.calcsize(fmt),
208 tpm2.TPM2_CC_GET_RANDOM,
209 0x20)
210 self.client.tpm.write(cmd)
211
212 # expect the second one to raise -EBUSY error
213 self.client.tpm.write(cmd)
214 rsp = self.client.tpm.read()
215
216 except IOError as e:
217 # read the response
218 rsp = self.client.tpm.read()
219 rejected = True
220 pass
221 except:
222 pass
223 self.assertEqual(rejected, True)
224
225class SpaceTest(unittest.TestCase):
226 def setUp(self):
227 logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG)
228
229 def test_make_two_spaces(self):
230 log = logging.getLogger(__name__)
231 log.debug("test_make_two_spaces")
232
233 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
234 root1 = space1.create_root_key()
235 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
236 root2 = space2.create_root_key()
237 root3 = space2.create_root_key()
238
239 log.debug("%08x" % (root1))
240 log.debug("%08x" % (root2))
241 log.debug("%08x" % (root3))
242
243 def test_flush_context(self):
244 log = logging.getLogger(__name__)
245 log.debug("test_flush_context")
246
247 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
248 root1 = space1.create_root_key()
249 log.debug("%08x" % (root1))
250
251 space1.flush_context(root1)
252
253 def test_get_handles(self):
254 log = logging.getLogger(__name__)
255 log.debug("test_get_handles")
256
257 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
258 space1.create_root_key()
259 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
260 space2.create_root_key()
261 space2.create_root_key()
262
263 handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT)
264
265 self.assertEqual(len(handles), 2)
266
267 log.debug("%08x" % (handles[0]))
268 log.debug("%08x" % (handles[1]))
269
270 def test_invalid_cc(self):
271 log = logging.getLogger(__name__)
272 log.debug(sys._getframe().f_code.co_name)
273
274 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1
275
276 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
277 root1 = space1.create_root_key()
278 log.debug("%08x" % (root1))
279
280 fmt = '>HII'
281 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt),
282 TPM2_CC_INVALID)
283
284 rc = 0
285 try:
286 space1.send_cmd(cmd)
287 except ProtocolError as e:
288 rc = e.rc
289
290 self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
291 tpm2.TSS2_RESMGR_TPM_RC_LAYER)
292
293class AsyncTest(unittest.TestCase):
294 def setUp(self):
295 logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
296
297 def test_async(self):
298 log = logging.getLogger(__name__)
299 log.debug(sys._getframe().f_code.co_name)
300
301 async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
302 log.debug("Calling get_cap in a NON_BLOCKING mode")
303 async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
304 async_client.close()