1010from django .core import files
1111from django .core .files import File as DjangoFile
1212from django .core .files .storage import default_storage
13+ from django .core .files .temp import NamedTemporaryFile
1314from django .core .management import call_command
1415from django .db import models
1516from django .test import TestCase
2122
2223DIR = os .path .abspath (os .path .split (__file__ )[0 ])
2324
25+
2426class DatabaseFilesTestCase (TestCase ):
2527
2628 def setUp (self ):
@@ -52,6 +54,9 @@ def test_adding_file(self):
5254 q = File .objects .all ()
5355 self .assertEqual (q .count (), 0 )
5456
57+ # Verify that the storage reports that the file exists
58+ self .assertTrue (o .upload .storage .exists (o .upload .name ))
59+
5560 # Verify we can read the contents of thing.
5661 o = Thing .objects .get (id = obj_id )
5762 self .assertEqual (o .upload .read (), b"hello there" )
@@ -67,7 +72,6 @@ def test_adding_file(self):
6772 test_file = files .temp .NamedTemporaryFile (
6873 suffix = '.txt' ,
6974 # Django>=1.10 no longer allows accessing files outside of MEDIA_ROOT...
70- #dir=files.temp.gettempdir()
7175 dir = os .path .join (settings .PROJECT_DIR , 'media' ),
7276 )
7377 data0 = b'1234567890'
@@ -88,7 +92,9 @@ def test_adding_file(self):
8892 self .assertEqual (os .path .isfile (test_fqfn ), True )
8993 os .remove (test_fqfn )
9094 self .assertEqual (os .path .isfile (test_fqfn ), False )
91- o1 .upload .read () # This forces the re-export to the filesystem.
95+ # Verify that the storage still reports that the file exists
96+ self .assertTrue (o1 .upload .storage .exists (o1 .upload .name ))
97+ o1 .upload .read () # This forces the re-export to the filesystem.
9298 self .assertEqual (os .path .isfile (test_fqfn ), True )
9399
94100 # This dumps all files to the filesystem.
@@ -115,42 +121,86 @@ def test_adding_extzipfile(self):
115121 self .assertEqual (six .BytesIO (uploaded .content ).read (), testfile .read ())
116122
117123 def test_hash (self ):
118- verbose = 1
119-
120124 # Create test file.
121125 image_content = open (os .path .join (DIR , 'fixtures/test_image.png' ), 'rb' ).read ()
122126 fqfn = os .path .join (self .media_dir , 'image.png' )
123127 open (fqfn , 'wb' ).write (image_content )
124128
125129 # Calculate hash from various sources and confirm they all match.
126- expected_hash = '35830221efe45ab0dc3d91ca23c29d2d3c20d00c9afeaa096ab256ec322a7a0b3293f07a01377e31060e65b4e5f6f8fdb4c0e56bc586bba5a7ab3e6d6d97a192' # pylint: disable=C0301
130+ expected_hash = '35830221efe45ab0dc3d91ca23c29d2d3c20d00c9afeaa096ab256ec322a7a0b3293f07a01377e31060e65b4e5f6f8fdb4c0e56bc586bba5a7ab3e6d6d97a192' # pylint: disable=C0301
127131 h = utils .get_text_hash (image_content )
128132 self .assertEqual (h , expected_hash )
129133 h = utils .get_file_hash (fqfn )
130134 self .assertEqual (h , expected_hash )
131135 h = utils .get_text_hash (open (fqfn , 'rb' ).read ())
132136 self .assertEqual (h , expected_hash )
133- # h = utils.get_text_hash(open(fqfn, 'r').read())#not supported in py3
134- # self.assertEqual(h, expected_hash)
135137
136138 # Create test file.
137139 if six .PY3 :
138- image_content = six .text_type ('aあä' )#, encoding='utf-8')
140+ image_content = six .text_type ('aあä' )
139141 else :
140142 image_content = six .text_type ('aあä' , encoding = 'utf-8' )
141143 fqfn = os .path .join (self .media_dir , 'test.txt' )
142144 open (fqfn , 'wb' ).write (image_content .encode ('utf-8' ))
143145
144- expected_hash = '1f40fc92da241694750979ee6cf582f2d5d7d28e18335de05abc54d0560e0f5302860c652bf08d560252aa5e74210546f369fbbbce8c12cfc7957b2652fe9a75' # pylint: disable=C0301
146+ expected_hash = '1f40fc92da241694750979ee6cf582f2d5d7d28e18335de05abc54d0560e0f5302860c652bf08d560252aa5e74210546f369fbbbce8c12cfc7957b2652fe9a75' # pylint: disable=C0301
145147 h = utils .get_text_hash (image_content )
146148 self .assertEqual (h , expected_hash )
147149 h = utils .get_file_hash (fqfn )
148150 self .assertEqual (h , expected_hash )
149151 h = utils .get_text_hash (open (fqfn , 'rb' ).read ())
150152 self .assertEqual (h , expected_hash )
151153
154+ def test_get_available_name (self ):
155+ storage = DatabaseStorage (location = os .path .join (DIR , 'media/custom_location' ))
156+ if os .path .isdir (storage .location ):
157+ shutil .rmtree (storage .location )
158+
159+ class CustomLocationThing (models .Model ):
160+ upload = models .FileField (storage = storage , max_length = 500 )
161+
162+ class Meta :
163+ managed = False
164+ db_table = Thing ()._meta .db_table
165+
166+ # Create a file in a subdirectory of the custom location and save it
167+ test_fn = os .path .join ('subdirectory' , 'test.txt' )
168+ test_fqfn = os .path .join (storage .location , 'subdirectory' , 'test.txt' )
169+ with NamedTemporaryFile () as t :
170+ t .write (b"hello there" )
171+ o = CustomLocationThing ()
172+ o .upload = files .File (t , name = test_fn )
173+ o .save ()
174+
175+ # Verify we can read the contents of thing.
176+ o = CustomLocationThing .objects .first ()
177+ self .assertEqual (o .upload .read (), b"hello there" )
178+ self .assertEqual (File .objects .count (), 1 )
179+ self .assertEqual (File .objects .first ().name , os .path .join ('custom_location' , 'subdirectory' , 'test.txt' ))
180+
181+ # Delete file from local filesystem
182+ self .assertEqual (os .path .isfile (test_fqfn ), True )
183+ os .remove (test_fqfn )
184+ self .assertEqual (os .path .isfile (test_fqfn ), False )
185+
186+ # Create another (different) file in the same subdirectory of the custom location and save it
187+ with NamedTemporaryFile () as t :
188+ t .write (b"goodbye" )
189+ o2 = o = CustomLocationThing ()
190+ o .upload = files .File (t , name = test_fn )
191+ o .save ()
192+
193+ # Verify we can read the contents of thing.
194+ o = CustomLocationThing .objects .get (pk = o2 .pk )
195+ self .assertEqual (o .upload .read (), b"goodbye" )
196+ self .assertEqual (File .objects .count (), 2 )
197+ # 'custom_location/subdirectory/test.txt' is already taken, so the second file
198+ # should have a different name
199+ self .assertNotEqual (o .upload .name , os .path .join ('custom_location' , 'subdirectory' , 'test.txt' ))
200+ self .assertTrue (o .upload .name .startswith (os .path .join ('custom_location' , 'subdirectory' , 'test_' )))
201+
152202 def test_custom_location (self ):
153- storage = DatabaseStorage (location = os .path .join (DIR , 'media/location ' ))
203+ storage = DatabaseStorage (location = os .path .join (DIR , 'media/custom_location ' ))
154204 if os .path .isdir (storage .location ):
155205 shutil .rmtree (storage .location )
156206
@@ -189,10 +239,126 @@ class Meta:
189239 self .assertEqual (q .count (), 1 )
190240 self .assertEqual (six .BytesIO (q .first ().content ).getvalue (), b"hello there" )
191241
242+ # Verify that the storage reports that the file exists
243+ self .assertTrue (o .upload .storage .exists (o .upload .name ))
244+
245+ # Delete file from local filesystem and re-export it from the database.
246+ self .assertEqual (os .path .isfile (test_fqfn ), True )
247+ os .remove (test_fqfn )
248+ self .assertEqual (os .path .isfile (test_fqfn ), False )
249+ # Verify that the storage still reports that the file exists
250+ self .assertTrue (o1 .upload .storage .exists (o1 .upload .name ))
251+ o1 .upload .read () # This forces the re-export to the filesystem.
252+ self .assertEqual (os .path .isfile (test_fqfn ), True )
253+
254+ # Verify that deleting the file from the storage removes both the database and the local copy
255+ o .upload .delete ()
256+ self .assertEqual (os .path .isfile (test_fqfn ), False )
257+ self .assertEqual (q .count (), 0 )
258+ self .assertFalse (o1 .upload .storage .exists (o1 .upload .name ))
259+
260+ def test_duplicate_name_in_different_locations (self ):
261+ storage1 = DatabaseStorage (location = os .path .join (DIR , 'media/location1' ))
262+ if os .path .isdir (storage1 .location ):
263+ shutil .rmtree (storage1 .location )
264+
265+ storage2 = DatabaseStorage (location = os .path .join (DIR , 'media/location2' ))
266+ if os .path .isdir (storage2 .location ):
267+ shutil .rmtree (storage2 .location )
268+
269+ def get_name (instance , filename ):
270+ return "dummy.txt"
271+
272+ class Location1Thing (models .Model ):
273+ upload = models .FileField (storage = storage1 , upload_to = get_name , max_length = 500 )
274+
275+ class Meta :
276+ managed = False
277+ db_table = Thing ()._meta .db_table
278+
279+ class Location2Thing (models .Model ):
280+ upload = models .FileField (storage = storage2 , upload_to = get_name , max_length = 500 )
281+
282+ class Meta :
283+ managed = False
284+ db_table = Thing ()._meta .db_table
285+
286+ tmpdir = tempfile .mkdtemp (dir = os .path .join (settings .PROJECT_DIR , 'media' ))
287+ data1 = b'11111111'
288+ open (os .path .join (tmpdir , "dummy.txt" ), 'wb' ).write (data1 )
289+ upload = files .File (open (os .path .join (tmpdir , "dummy.txt" ), 'rb' ))
290+ t1 = Location1Thing (upload = upload )
291+ t1 .save ()
292+ self .assertTrue (t1 .upload .storage .exists (t1 .upload .name ))
293+ os .remove (t1 .upload .path )
294+ self .assertTrue (t1 .upload .storage .exists (t1 .upload .name ))
295+ self .assertEqual (t1 .upload .path , os .path .join (t1 .upload .storage .location , "dummy.txt" ))
296+ self .assertEqual (t1 .upload .path , Location1Thing .objects .get (pk = t1 .pk ).upload .path )
297+ data2 = b'22222222'
298+ open (os .path .join (tmpdir , "dummy.txt" ), 'wb' ).write (data2 )
299+ t2 = Location2Thing .objects .create (upload = files .File (open (os .path .join (tmpdir , "dummy.txt" ), 'rb' )))
300+ os .remove (t2 .upload .path )
301+ self .assertTrue (t2 .upload .storage .exists (t2 .upload .name ))
302+ self .assertEqual (t2 .upload .path , os .path .join (t2 .upload .storage .location , "dummy.txt" ))
303+ self .assertEqual (t2 .upload .path , Location2Thing .objects .get (pk = t2 .pk ).upload .path )
304+
305+ self .assertEqual (File .objects .count (), 2 )
306+ self .assertEqual (Location2Thing .objects .get (pk = t2 .pk ).upload .file .read (), data2 )
307+ self .assertEqual (Location1Thing .objects .get (pk = t1 .pk ).upload .file .read (), data1 )
308+ shutil .rmtree (tmpdir )
309+
310+ def test_custom_upload_to (self ):
311+ storage = DatabaseStorage (location = os .path .join (DIR , 'media/custom_location' ))
312+ if os .path .isdir (storage .location ):
313+ shutil .rmtree (storage .location )
314+
315+ def get_name (instance , filename ):
316+ return "dummy.txt"
317+
318+ class CustomUploadToThing (models .Model ):
319+ upload = models .FileField (storage = storage , upload_to = get_name , max_length = 500 )
320+
321+ class Meta :
322+ managed = False
323+ db_table = Thing ()._meta .db_table
324+
325+ # Create a file in a subdirectory of the custom location and save it
326+ test_fn = os .path .join ('subdirectory' , 'test.txt' )
327+ test_fqfn = os .path .join (storage .location , 'subdirectory' , 'test.txt' )
328+ os .makedirs (os .path .join (storage .location , 'subdirectory' ))
329+ open (test_fqfn , 'w' ).write ('hello there' )
330+ o1 = o = CustomUploadToThing ()
331+ o .upload = test_fn
332+ o .save ()
333+
334+ # Confirm thing was saved.
335+ q = CustomUploadToThing .objects .all ()
336+ self .assertEqual (q .count (), 1 )
337+ self .assertEqual (q [0 ].upload .name , test_fn )
338+
339+ # Confirm the file only exists on the file system
340+ # and hasn't been loaded into the database.
341+ q = File .objects .all ()
342+ self .assertEqual (q .count (), 0 )
343+
344+ # Verify we can read the contents of thing.
345+ o = CustomUploadToThing .objects .first ()
346+ self .assertEqual (o .upload .read (), b"hello there" )
347+
348+ # Verify that by attempting to read the file, we've automatically
349+ # loaded it into the database.
350+ self .assertEqual (q .count (), 1 )
351+ self .assertEqual (six .BytesIO (q .first ().content ).getvalue (), b"hello there" )
352+
353+ # Verify that the storage reports that the file exists
354+ self .assertTrue (o .upload .storage .exists (o .upload .name ))
355+
192356 # Delete file from local filesystem and re-export it from the database.
193357 self .assertEqual (os .path .isfile (test_fqfn ), True )
194358 os .remove (test_fqfn )
195359 self .assertEqual (os .path .isfile (test_fqfn ), False )
360+ # Verify that the storage still reports that the file exists
361+ self .assertTrue (o1 .upload .storage .exists (o1 .upload .name ))
196362 o1 .upload .read () # This forces the re-export to the filesystem.
197363 self .assertEqual (os .path .isfile (test_fqfn ), True )
198364
0 commit comments