126fa459cSmrg# Copyright 2016 The Brotli Authors. All rights reserved.
226fa459cSmrg#
326fa459cSmrg# Distributed under MIT license.
426fa459cSmrg# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
526fa459cSmrg
626fa459cSmrgimport functools
726fa459cSmrgimport unittest
826fa459cSmrg
926fa459cSmrgfrom . import _test_utils
1026fa459cSmrgimport brotli
1126fa459cSmrg
1226fa459cSmrg
1326fa459cSmrgdef _get_original_name(test_data):
1426fa459cSmrg    return test_data.split('.compressed')[0]
1526fa459cSmrg
1626fa459cSmrg
1726fa459cSmrgclass TestDecompressor(_test_utils.TestCase):
1826fa459cSmrg
1926fa459cSmrg    CHUNK_SIZE = 1
2026fa459cSmrg
2126fa459cSmrg    def setUp(self):
2226fa459cSmrg        self.decompressor = brotli.Decompressor()
2326fa459cSmrg
2426fa459cSmrg    def tearDown(self):
2526fa459cSmrg        self.decompressor = None
2626fa459cSmrg
2726fa459cSmrg    def _check_decompression(self, test_data):
2826fa459cSmrg        # Verify decompression matches the original.
2926fa459cSmrg        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
3026fa459cSmrg        original = _get_original_name(test_data)
3126fa459cSmrg        self.assertFilesMatch(temp_uncompressed, original)
3226fa459cSmrg
3326fa459cSmrg    def _decompress(self, test_data):
3426fa459cSmrg        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
3526fa459cSmrg        with open(temp_uncompressed, 'wb') as out_file:
3626fa459cSmrg            with open(test_data, 'rb') as in_file:
3726fa459cSmrg                read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
3826fa459cSmrg                for data in iter(read_chunk, b''):
3926fa459cSmrg                    out_file.write(self.decompressor.process(data))
4026fa459cSmrg        self.assertTrue(self.decompressor.is_finished())
4126fa459cSmrg
4226fa459cSmrg    def _test_decompress(self, test_data):
4326fa459cSmrg        self._decompress(test_data)
4426fa459cSmrg        self._check_decompression(test_data)
4526fa459cSmrg
4626fa459cSmrg    def test_garbage_appended(self):
4726fa459cSmrg        with self.assertRaises(brotli.error):
4826fa459cSmrg            self.decompressor.process(brotli.compress(b'a') + b'a')
4926fa459cSmrg
5026fa459cSmrg    def test_already_finished(self):
5126fa459cSmrg        self.decompressor.process(brotli.compress(b'a'))
5226fa459cSmrg        with self.assertRaises(brotli.error):
5326fa459cSmrg            self.decompressor.process(b'a')
5426fa459cSmrg
5526fa459cSmrg
5626fa459cSmrg_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
5726fa459cSmrg
5826fa459cSmrgif __name__ == '__main__':
5926fa459cSmrg    unittest.main()
60