rangolio/backend/apimanager/views.py

303 lines
12 KiB
Python

#######################Django related imports####################
import os
import shutil
import random
import json
import ast
from rest_framework import generics, status
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, FormParser
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from django.conf import settings
#################################################################
#API related imports
from .models import (
UserData,
Category,
Blog
)
from .serializers import (
UserDataSerializer,
ThemeDataSerializer,
CategorySerializer,
BlogSerializer,
UnifiedCategoryBlogSerializer,
MediaSerializer
)
################################################################
#Custom Imports
from .custom_storage import CustomStorage
#UserData related views#############################################
class UserDataUpdateAPIView(generics.RetrieveUpdateAPIView):
serializer_class = UserDataSerializer
def get_object(self):
obj, created = UserData.objects.get_or_create(pk=1)
return obj
class UserDataListAPIView(generics.ListAPIView):
queryset = UserData.objects.all()
serializer_class = UserDataSerializer
#####################################################################
#ThemeData related views#############################################
class ThemeDataUpdateAPIView(generics.RetrieveUpdateAPIView):
serializer_class = ThemeDataSerializer
def get_object(self):
obj, created = UserData.objects.get_or_create(pk=1)
return obj
class ThemeDataListAPIView(generics.ListAPIView):
queryset = UserData.objects.all()
serializer_class = ThemeDataSerializer
#####################################################################
#Category related views#############################################
class CategoryCreateAPIView(generics.CreateAPIView):
queryset = Category.objects.all()
serializer_class = CategorySerializer
class CategoryUpdateAPIView(generics.RetrieveUpdateAPIView):
queryset = Category.objects.all()
serializer_class = CategorySerializer
lookup_field = 'category_id'
class CategoryListAPIView(generics.ListAPIView):
queryset = Category.objects.all()
serializer_class = CategorySerializer
class CategoryDeleteAPIView(generics.DestroyAPIView):
queryset = Category.objects.all()
serializer_class = CategorySerializer
lookup_field = 'category_id'
class BlogsByCategoryAPIView(APIView):
def get(self, request, category_id):
try:
category = Category.objects.get(category_id=category_id)
except Category.DoesNotExist:
return Response({'message': 'Category not found'}, status=404)
serializer = UnifiedCategoryBlogSerializer(category)
return Response(serializer.data)
################################################################
#Blog related views##################################################
class BlogCreateAPIView(generics.CreateAPIView):
queryset = Blog.objects.all()
serializer_class = BlogSerializer
class BlogUpdateAPIView(generics.RetrieveUpdateAPIView):
queryset = Blog.objects.all()
serializer_class = BlogSerializer
lookup_field = 'blog_id'
class BlogRetrieveAPIView(generics.RetrieveAPIView):
queryset = Blog.objects.all()
serializer_class = BlogSerializer
lookup_field = 'blog_id'
class BlogDeleteAPIView(generics.DestroyAPIView):
queryset = Blog.objects.all()
serializer_class = BlogSerializer
lookup_field = 'blog_id'
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.remove_directory(instance)
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
def remove_directory(self, instance):
print(f"Deleting media files for {instance}")
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', 'blog', str(instance.blog_id))
try:
shutil.rmtree(media_folder)
print(f"Directory '{media_folder}' and all its contents have been removed")
except Exception as e:
print(f"Failed to remove {media_folder}. Reason: {e}")
####################################################################
class MediaUpload(APIView):
parser_classes = (MultiPartParser, FormParser)
def post(self, request, *args, **kwargs):
file_serializer = MediaSerializer(data=request.data)
if file_serializer.is_valid():
files = request.FILES.getlist('media')
resource_type = file_serializer.validated_data['resource_type']
resource_id = file_serializer.validated_data['resource_id']
file_path_base = f'data'
for f in files:
file_unique_slug = ''.join(random.choices('ABCDEabcde12345', k=6))
if resource_id != resource_type:
file_path = f"{file_path_base}/{resource_type}/{resource_id}/media/{file_unique_slug+resource_id+f.name}"
else:
file_path = f"{file_path_base}/{resource_type}/media/{file_unique_slug+resource_id+f.name}"
default_storage.save(file_path, f)
return Response(file_serializer.data, status=status.HTTP_201_CREATED)
else:
return Response(file_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ListMedia(APIView):
def get(self, request, resource_type, resource_id, format=None):
if resource_id != resource_type:
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, resource_id, 'media')
else:
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, 'media')
if not os.path.exists(media_folder):
return Response({'error': 'Media directory not found'}, status=status.HTTP_404_NOT_FOUND)
media_files = [f for f in os.listdir(media_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
if resource_id != resource_type:
media_urls = [request.build_absolute_uri(f'{settings.MEDIA_URL}data/{resource_type}/{resource_id}/media/' + f) for f in media_files]
else:
media_urls = [request.build_absolute_uri(f'{settings.MEDIA_URL}data/{resource_type}/media/' + f) for f in media_files]
return Response({'media': media_urls}, status=status.HTTP_200_OK)
def delete(self, request, resource_type, resource_id, format=None):
if resource_id != resource_type:
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, resource_id, 'media')
else:
media_folder = os.path.join(settings.MEDIA_ROOT, 'data', resource_type, 'media')
file_name = request.query_params.get('file')
if not file_name or not file_name.endswith(('.png', '.jpg', '.jpeg')):
return Response({'error': 'Invalid or no file name provided'}, status=status.HTTP_400_BAD_REQUEST)
file_path = os.path.join(media_folder, file_name)
if os.path.exists(file_path):
os.remove(file_path)
return Response({'message': 'File deleted successfully'}, status=status.HTTP_204_NO_CONTENT)
else:
return Response({'error': 'File not found'}, status=status.HTTP_404_NOT_FOUND)
class PublishMethods(APIView):
def get(self, request, format=None):
deployment_methods = [
{
"key_name": "server_deploy",
"name": "Server Deploy"
},
{
"key_name": "github_deploy",
"name": "Github Deploy"
}
]
return Response(deployment_methods, status=status.HTTP_200_OK)
class Publish(APIView):
def get(self, request, deploy_type, format=None):
storage = CustomStorage()
self.create_json(storage)
return Response({"deploy_type": deploy_type}, status=status.HTTP_200_OK)
def create_json(self, storage):
self.create_user_data_json(UserData.objects.first(), storage)
self.create_theme_data_json(UserData.objects.first(), storage)
self.create_category_data_json(Category, storage)
self.create_blog_data_json(Blog.objects.all(), storage)
self.merge_media()
def create_user_data_json(self, instance, storage):
json_content = {
"name": instance.name,
"introContent": instance.intro_content,
"profilePhoto": self.sanitize_media_link(instance.profile_photo),
"builtWith": instance.built_with
}
self.save_json(json_content, 'shared/user-data.json', storage)
def create_theme_data_json(self, instance, storage):
json_content = {
"defaultTheme": instance.default_theme,
"darkTheme": ast.literal_eval(instance.dark_theme),
"lightTheme": ast.literal_eval(instance.light_theme),
}
self.save_json(json_content, 'shared/theme-config.json', storage)
def create_category_data_json(self, model_instance, storage):
categories = []
instance_objects = model_instance.objects.all()
if not instance_objects.exists():
self.save_json([], 'category/category-metadata.json', storage)
else:
for eachInstance in instance_objects:
instance_data = {
"id": str(eachInstance.category_id),
"name": eachInstance.name,
"coverImage": self.sanitize_media_link(eachInstance.cover_image),
"tagLine": eachInstance.tagline,
"description": eachInstance.description,
"featuredBlog": eachInstance.featured_id
}
categories.append(instance_data)
self.create_instance_data(instance_data, model_instance.objects.get(category_id=eachInstance.category_id), storage)
self.save_json(categories, 'category/category-metadata.json', storage)
def create_blog_data_json(self, instance, storage):
if not instance.exists():
pass
else:
for eachBlog in instance:
instance_data = {
"id": str(eachBlog.blog_id),
"name": eachBlog.name,
"description": eachBlog.description,
"coverimage": self.sanitize_media_link(eachBlog.cover_image),
"tagLine": eachBlog.tagline,
"parentCategory": str(eachBlog.parent_category.category_id),
"contentBody": eachBlog.content_body
}
self.save_json(instance_data, f'blog/{instance_data["id"]}/blog-data.json', storage)
def merge_media(self):
source_dir = 'media/data'
destination_dir = 'deploy/data'
if not os.path.exists(source_dir):
print("The source directory does not exist.")
else:
try:
shutil.copytree(source_dir, destination_dir, dirs_exist_ok=True)
print(f"Directory copied successfully from {source_dir} to {destination_dir}")
except Exception as e:
print(f"Error occurred: {e}")
def create_instance_data(self, instance_data, blogs_by_category_instance, storage):
instance_data["blogMetadata"]=[]
blogs = blogs_by_category_instance.blogs.all()
if not blogs.exists():
self.save_json(instance_data, f'category/{instance_data["id"]}/category-data.json', storage)
else:
for eachBlog in blogs:
instance_data["blogMetadata"].append({
"id": str(eachBlog.blog_id),
"name": eachBlog.name,
"description": eachBlog.description,
"coverimage": self.sanitize_media_link(eachBlog.cover_image),
"tagLine": eachBlog.tagline,
"parentCategory": instance_data["id"]
})
self.save_json(instance_data, f'category/{instance_data["id"]}/category-data.json', storage)
def save_json(self, json_content, file_name, storage):
data_json = json.dumps(json_content, indent=2)
if storage.exists(file_name):
storage.delete(file_name)
storage.save(file_name, ContentFile(data_json.encode('utf-8')))
def sanitize_media_link(self, string):
if not string:
return ''
return string.replace('http://127.0.0.1:8000/media/data/', '')