303 lines
12 KiB
Python
303 lines
12 KiB
Python
#######################Django related imports####################
|
|
import os
|
|
import shutil
|
|
import random
|
|
import json
|
|
import ast
|
|
from rest_framework import generics, status
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework.parsers import MultiPartParser, FormParser
|
|
from django.core.files.storage import default_storage
|
|
from django.core.files.base import ContentFile
|
|
from django.conf import settings
|
|
#################################################################
|
|
#API related imports
|
|
from .models import (
|
|
UserData,
|
|
Category,
|
|
Blog
|
|
)
|
|
from .serializers import (
|
|
UserDataSerializer,
|
|
ThemeDataSerializer,
|
|
CategorySerializer,
|
|
BlogSerializer,
|
|
UnifiedCategoryBlogSerializer,
|
|
MediaSerializer
|
|
)
|
|
################################################################
|
|
#Custom Imports
|
|
from .custom_storage import CustomStorage
|
|
|
|
#UserData related views#############################################
|
|
class UserDataUpdateAPIView(generics.RetrieveUpdateAPIView):
|
|
serializer_class = UserDataSerializer
|
|
|
|
def get_object(self):
|
|
obj, created = UserData.objects.get_or_create(pk=1)
|
|
return obj
|
|
|
|
class UserDataListAPIView(generics.ListAPIView):
|
|
queryset = UserData.objects.all()
|
|
serializer_class = UserDataSerializer
|
|
#####################################################################
|
|
|
|
#ThemeData related views#############################################
|
|
class ThemeDataUpdateAPIView(generics.RetrieveUpdateAPIView):
|
|
serializer_class = ThemeDataSerializer
|
|
|
|
def get_object(self):
|
|
obj, created = UserData.objects.get_or_create(pk=1)
|
|
return obj
|
|
|
|
class ThemeDataListAPIView(generics.ListAPIView):
|
|
queryset = UserData.objects.all()
|
|
serializer_class = ThemeDataSerializer
|
|
#####################################################################
|
|
|
|
#Category related views#############################################
|
|
class CategoryCreateAPIView(generics.CreateAPIView):
|
|
queryset = Category.objects.all()
|
|
serializer_class = CategorySerializer
|
|
|
|
class CategoryUpdateAPIView(generics.RetrieveUpdateAPIView):
|
|
queryset = Category.objects.all()
|
|
serializer_class = CategorySerializer
|
|
lookup_field = 'category_id'
|
|
|
|
class CategoryListAPIView(generics.ListAPIView):
|
|
queryset = Category.objects.all()
|
|
serializer_class = CategorySerializer
|
|
|
|
class CategoryDeleteAPIView(generics.DestroyAPIView):
|
|
queryset = Category.objects.all()
|
|
serializer_class = CategorySerializer
|
|
lookup_field = 'category_id'
|
|
|
|
class BlogsByCategoryAPIView(APIView):
|
|
def get(self, request, category_id):
|
|
try:
|
|
category = Category.objects.get(category_id=category_id)
|
|
except Category.DoesNotExist:
|
|
return Response({'message': 'Category not found'}, status=404)
|
|
|
|
serializer = UnifiedCategoryBlogSerializer(category)
|
|
return Response(serializer.data)
|
|
################################################################
|
|
|
|
#Blog related views##################################################
|
|
class BlogCreateAPIView(generics.CreateAPIView):
|
|
queryset = Blog.objects.all()
|
|
serializer_class = BlogSerializer
|
|
|
|
class BlogUpdateAPIView(generics.RetrieveUpdateAPIView):
|
|
queryset = Blog.objects.all()
|
|
serializer_class = BlogSerializer
|
|
lookup_field = 'blog_id'
|
|
|
|
class BlogRetrieveAPIView(generics.RetrieveAPIView):
|
|
queryset = Blog.objects.all()
|
|
serializer_class = BlogSerializer
|
|
lookup_field = 'blog_id'
|
|
|
|
class BlogDeleteAPIView(generics.DestroyAPIView):
|
|
queryset = Blog.objects.all()
|
|
serializer_class = BlogSerializer
|
|
lookup_field = 'blog_id'
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
self.remove_directory(instance)
|
|
self.perform_destroy(instance)
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
def remove_directory(self, instance):
|
|
print(f"Deleting media files for {instance}")
|
|
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', 'blog', str(instance.blog_id))
|
|
try:
|
|
shutil.rmtree(media_folder)
|
|
print(f"Directory '{media_folder}' and all its contents have been removed")
|
|
except Exception as e:
|
|
print(f"Failed to remove {media_folder}. Reason: {e}")
|
|
|
|
####################################################################
|
|
|
|
|
|
class MediaUpload(APIView):
|
|
parser_classes = (MultiPartParser, FormParser)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
file_serializer = MediaSerializer(data=request.data)
|
|
if file_serializer.is_valid():
|
|
files = request.FILES.getlist('media')
|
|
resource_type = file_serializer.validated_data['resource_type']
|
|
resource_id = file_serializer.validated_data['resource_id']
|
|
file_path_base = f'data'
|
|
|
|
for f in files:
|
|
file_unique_slug = ''.join(random.choices('ABCDEabcde12345', k=6))
|
|
if resource_id != resource_type:
|
|
file_path = f"{file_path_base}/{resource_type}/{resource_id}/media/{file_unique_slug+resource_id+f.name}"
|
|
else:
|
|
file_path = f"{file_path_base}/{resource_type}/media/{file_unique_slug+resource_id+f.name}"
|
|
default_storage.save(file_path, f)
|
|
|
|
return Response(file_serializer.data, status=status.HTTP_201_CREATED)
|
|
else:
|
|
return Response(file_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
class ListMedia(APIView):
|
|
def get(self, request, resource_type, resource_id, format=None):
|
|
if resource_id != resource_type:
|
|
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, resource_id, 'media')
|
|
else:
|
|
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, 'media')
|
|
|
|
if not os.path.exists(media_folder):
|
|
return Response({'error': 'Media directory not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
media_files = [f for f in os.listdir(media_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
|
|
if resource_id != resource_type:
|
|
media_urls = [request.build_absolute_uri(f'{settings.MEDIA_URL}data/{resource_type}/{resource_id}/media/' + f) for f in media_files]
|
|
else:
|
|
media_urls = [request.build_absolute_uri(f'{settings.MEDIA_URL}data/{resource_type}/media/' + f) for f in media_files]
|
|
|
|
return Response({'media': media_urls}, status=status.HTTP_200_OK)
|
|
|
|
def delete(self, request, resource_type, resource_id, format=None):
|
|
if resource_id != resource_type:
|
|
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, resource_id, 'media')
|
|
else:
|
|
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, 'media')
|
|
file_name = request.query_params.get('file')
|
|
if not file_name or not file_name.endswith(('.png', '.jpg', '.jpeg')):
|
|
return Response({'error': 'Invalid or no file name provided'}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
file_path = os.path.join(media_folder, file_name)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
return Response({'message': 'File deleted successfully'}, status=status.HTTP_204_NO_CONTENT)
|
|
else:
|
|
return Response({'error': 'File not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
class PublishMethods(APIView):
|
|
def get(self, request, format=None):
|
|
deployment_methods = [
|
|
{
|
|
"key_name": "server_deploy",
|
|
"name": "Server Deploy"
|
|
},
|
|
{
|
|
"key_name": "github_deploy",
|
|
"name": "Github Deploy"
|
|
}
|
|
]
|
|
return Response(deployment_methods, status=status.HTTP_200_OK)
|
|
|
|
class Publish(APIView):
|
|
def get(self, request, deploy_type, format=None):
|
|
storage = CustomStorage()
|
|
self.create_json(storage)
|
|
return Response({"deploy_type": deploy_type}, status=status.HTTP_200_OK)
|
|
|
|
def create_json(self, storage):
|
|
self.create_user_data_json(UserData.objects.first(), storage)
|
|
self.create_theme_data_json(UserData.objects.first(), storage)
|
|
self.create_category_data_json(Category, storage)
|
|
self.create_blog_data_json(Blog.objects.all(), storage)
|
|
self.merge_media()
|
|
|
|
def create_user_data_json(self, instance, storage):
|
|
json_content = {
|
|
"name": instance.name,
|
|
"introContent": instance.intro_content,
|
|
"profilePhoto": self.sanitize_media_link(instance.profile_photo),
|
|
"builtWith": instance.built_with
|
|
}
|
|
self.save_json(json_content, 'shared/user-data.json', storage)
|
|
|
|
def create_theme_data_json(self, instance, storage):
|
|
json_content = {
|
|
"defaultTheme": instance.default_theme,
|
|
"darkTheme": ast.literal_eval(instance.dark_theme),
|
|
"lightTheme": ast.literal_eval(instance.light_theme),
|
|
}
|
|
self.save_json(json_content, 'shared/theme-config.json', storage)
|
|
|
|
def create_category_data_json(self, model_instance, storage):
|
|
categories = []
|
|
instance_objects = model_instance.objects.all()
|
|
if not instance_objects.exists():
|
|
self.save_json([], 'category/category-metadata.json', storage)
|
|
else:
|
|
for eachInstance in instance_objects:
|
|
instance_data = {
|
|
"id": str(eachInstance.category_id),
|
|
"name": eachInstance.name,
|
|
"coverImage": self.sanitize_media_link(eachInstance.cover_image),
|
|
"tagLine": eachInstance.tagline,
|
|
"description": eachInstance.description,
|
|
"featuredBlog": eachInstance.featured_id
|
|
}
|
|
categories.append(instance_data)
|
|
self.create_instance_data(instance_data, model_instance.objects.get(category_id=eachInstance.category_id), storage)
|
|
self.save_json(categories, 'category/category-metadata.json', storage)
|
|
|
|
|
|
def create_blog_data_json(self, instance, storage):
|
|
if not instance.exists():
|
|
pass
|
|
else:
|
|
for eachBlog in instance:
|
|
instance_data = {
|
|
"id": str(eachBlog.blog_id),
|
|
"name": eachBlog.name,
|
|
"description": eachBlog.description,
|
|
"coverimage": self.sanitize_media_link(eachBlog.cover_image),
|
|
"tagLine": eachBlog.tagline,
|
|
"parentCategory": str(eachBlog.parent_category.category_id),
|
|
"contentBody": eachBlog.content_body
|
|
}
|
|
self.save_json(instance_data, f'blog/{instance_data["id"]}/blog-data.json', storage)
|
|
|
|
def merge_media(self):
|
|
source_dir = 'media/data'
|
|
destination_dir = 'deploy/data'
|
|
if not os.path.exists(source_dir):
|
|
print("The source directory does not exist.")
|
|
else:
|
|
try:
|
|
shutil.copytree(source_dir, destination_dir, dirs_exist_ok=True)
|
|
print(f"Directory copied successfully from {source_dir} to {destination_dir}")
|
|
except Exception as e:
|
|
print(f"Error occurred: {e}")
|
|
|
|
def create_instance_data(self, instance_data, blogs_by_category_instance, storage):
|
|
instance_data["blogMetadata"]=[]
|
|
blogs = blogs_by_category_instance.blogs.all()
|
|
if not blogs.exists():
|
|
self.save_json(instance_data, f'category/{instance_data["id"]}/category-data.json', storage)
|
|
else:
|
|
for eachBlog in blogs:
|
|
instance_data["blogMetadata"].append({
|
|
"id": str(eachBlog.blog_id),
|
|
"name": eachBlog.name,
|
|
"description": eachBlog.description,
|
|
"coverimage": self.sanitize_media_link(eachBlog.cover_image),
|
|
"tagLine": eachBlog.tagline,
|
|
"parentCategory": instance_data["id"]
|
|
})
|
|
self.save_json(instance_data, f'category/{instance_data["id"]}/category-data.json', storage)
|
|
|
|
def save_json(self, json_content, file_name, storage):
|
|
data_json = json.dumps(json_content, indent=2)
|
|
if storage.exists(file_name):
|
|
storage.delete(file_name)
|
|
storage.save(file_name, ContentFile(data_json.encode('utf-8')))
|
|
def sanitize_media_link(self, string):
|
|
if not string:
|
|
return ''
|
|
return string.replace('http://127.0.0.1:8000/media/data/', '')
|
|
|