parse and save email attachments
This commit is contained in:
parent
f9a95317a2
commit
734af10525
8 changed files with 357 additions and 76 deletions
|
@ -27,12 +27,16 @@ class FileManager(models.Manager):
|
|||
kwargs['file'] = ContentFile(content, content_hash)
|
||||
kwargs['hash'] = content_hash
|
||||
kwargs['mime_type'] = mime_type
|
||||
elif 'file' in kwargs and 'hash' in kwargs and type(kwargs['file']) == ContentFile and 'mime_type' in kwargs:
|
||||
pass
|
||||
else:
|
||||
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
|
||||
try:
|
||||
return self.get(hash=kwargs['hash']), False
|
||||
except self.model.DoesNotExist:
|
||||
return self.create(**kwargs), True
|
||||
obj = super().create(**kwargs)
|
||||
obj.file.save(content=kwargs['file'], name=kwargs['hash'])
|
||||
return obj, True
|
||||
|
||||
def create(self, **kwargs):
|
||||
if 'data' in kwargs and type(kwargs['data']) == str:
|
||||
|
@ -51,23 +55,32 @@ class FileManager(models.Manager):
|
|||
kwargs['file'] = ContentFile(content, content_hash)
|
||||
kwargs['hash'] = content_hash
|
||||
kwargs['mime_type'] = mime_type
|
||||
elif 'file' in kwargs and 'hash' in kwargs and type(kwargs['file']) == ContentFile:
|
||||
elif 'file' in kwargs and 'hash' in kwargs and type(kwargs['file']) == ContentFile and 'mime_type' in kwargs:
|
||||
pass
|
||||
else:
|
||||
raise ValueError('data must be a base64 encoded string or file and hash must be provided')
|
||||
if not self.filter(hash=kwargs['hash']).exists():
|
||||
return super().create(**kwargs)
|
||||
obj = super().create(**kwargs)
|
||||
obj.file.save(content=kwargs['file'], name=kwargs['hash'])
|
||||
return obj
|
||||
else:
|
||||
raise IntegrityError('File with this hash already exists')
|
||||
|
||||
|
||||
class File(models.Model):
|
||||
item = models.ForeignKey(Item, models.CASCADE, db_column='iid', null=True, blank=True, related_name='files')
|
||||
class AbstractFile(models.Model):
|
||||
created_at = models.DateTimeField(blank=True, null=True)
|
||||
updated_at = models.DateTimeField(blank=True, null=True)
|
||||
deleted_at = models.DateTimeField(blank=True, null=True)
|
||||
file = models.ImageField(upload_to=hash_upload)
|
||||
file = models.FileField(upload_to=hash_upload)
|
||||
mime_type = models.CharField(max_length=255, null=False, blank=False)
|
||||
hash = models.CharField(max_length=64, null=False, blank=False, unique=True)
|
||||
|
||||
objects = FileManager()
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
|
||||
class File(AbstractFile):
|
||||
item = models.ForeignKey(Item, models.CASCADE, db_column='iid', null=True, blank=True, related_name='files')
|
||||
pass
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from django.test import TestCase, Client
|
||||
from django.core.files.base import ContentFile
|
||||
|
||||
from files.models import File
|
||||
from inventory.models import Event, Container, Item
|
||||
|
@ -13,13 +14,27 @@ class FileTestCase(TestCase):
|
|||
self.event = Event.objects.create(slug='EVENT', name='Event')
|
||||
self.box = Container.objects.create(name='BOX')
|
||||
|
||||
def test_create_file_raw(self):
|
||||
from hashlib import sha256
|
||||
content = b"foo"
|
||||
chash = sha256(content).hexdigest()
|
||||
item = Item.objects.create(container=self.box, event=self.event, description='1')
|
||||
file = File.objects.create(file=ContentFile(b"foo"), mime_type='text/plain', hash=chash, item=item)
|
||||
file.save()
|
||||
self.assertEqual(1, len(File.objects.all()))
|
||||
self.assertEqual(content, File.objects.all()[0].file.read())
|
||||
self.assertEqual(chash, File.objects.all()[0].hash)
|
||||
|
||||
def test_list_files(self):
|
||||
import base64
|
||||
|
||||
item = File.objects.create(data="data:text/plain;base64," + base64.b64encode(b"foo").decode('utf-8'))
|
||||
response = client.get('/api/1/files')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json()[0]['hash'], item.hash)
|
||||
self.assertEqual(len(response.json()[0]['hash']), 64)
|
||||
self.assertEqual(len(File.objects.all()), 1)
|
||||
self.assertEqual(File.objects.all()[0].file.read(), b"foo")
|
||||
|
||||
def test_one_file(self):
|
||||
import base64
|
||||
|
@ -28,6 +43,8 @@ class FileTestCase(TestCase):
|
|||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json()['hash'], item.hash)
|
||||
self.assertEqual(len(response.json()['hash']), 64)
|
||||
self.assertEqual(len(File.objects.all()), 1)
|
||||
self.assertEqual(File.objects.all()[0].file.read(), b"foo")
|
||||
|
||||
def test_create_file(self):
|
||||
import base64
|
||||
|
@ -38,6 +55,8 @@ class FileTestCase(TestCase):
|
|||
content_type='application/json')
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertEqual(len(response.json()['hash']), 64)
|
||||
self.assertEqual(len(File.objects.all()), 1)
|
||||
self.assertEqual(File.objects.all()[0].file.read(), b"foo")
|
||||
|
||||
def test_delete_file(self):
|
||||
import base64
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue