ajout de caldav_sync_manager et refactoring du copy de tache
This commit is contained in:
@@ -135,7 +135,9 @@ MIDDLEWARE = [
|
||||
]
|
||||
|
||||
CORS_ALLOWED_ORIGINS = [
|
||||
"http://33.144.144.13", # Ajoutez ici l'URL de votre application frontend ,
|
||||
"http://33.144.144.13:4173",
|
||||
"http://33.144.144.13:5173",
|
||||
"http://33.144.144.13:3000",
|
||||
]
|
||||
|
||||
|
||||
|
@@ -1,3 +1,7 @@
|
||||
from django.contrib import admin
|
||||
|
||||
from mycaldav.models import caldav_sync_manager
|
||||
|
||||
admin.site.register(caldav_sync_manager)
|
||||
|
||||
# Register your models here.
|
||||
|
@@ -1,3 +1,4 @@
|
||||
from django.db import models
|
||||
from datetime import datetime, timedelta, time
|
||||
from django.contrib.auth.models import User
|
||||
import datetime as Datetime
|
||||
@@ -22,6 +23,74 @@ from dateutil.parser import parse
|
||||
|
||||
Key_separator = "--"
|
||||
|
||||
|
||||
class caldav_sync_manager(models.Model):
|
||||
dtDate = models.DateField('Date_synchronized', auto_now_add=True, unique=True)
|
||||
bDone = models.BooleanField("Effectuée", default=False)
|
||||
dtUpdated = models.DateTimeField('date updated', auto_now=True)
|
||||
dtCreated = models.DateTimeField('date published', auto_now_add=True)
|
||||
|
||||
def init_caldav(self, caldav_url, caldav_user, caldav_password):
|
||||
client = caldav.DAVClient(url=caldav_url, username=caldav_user, password=caldav_password)
|
||||
data = client.principal()
|
||||
|
||||
self.a_task = data.calendar(cal_id=caldav_id["task"])
|
||||
|
||||
def set_today_as_synced(self):
|
||||
o_new_manager = caldav_sync_manager()
|
||||
o_new_manager.bDone=True
|
||||
o_new_manager.save()
|
||||
|
||||
def init_task_management(self):
|
||||
today = datetime.now()
|
||||
o_caldav_sync_management = caldav_sync_manager.objects.filter(dtDate=today.date())
|
||||
if not o_caldav_sync_management.exists() or o_caldav_sync_management.first().bDone is False:
|
||||
self.copy_caldav_data()
|
||||
else:
|
||||
print("pas de copy, sync déjà fait")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def copy_caldav_data(self,):
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
today = datetime.now()
|
||||
seeked_date = today + relativedelta(months=1)
|
||||
sync_date = today + relativedelta(months=1, days=1)
|
||||
modified_url = caldav_cfg["task_config"] + f"&start={int(today.timestamp())}&end={int(sync_date.timestamp())}&expand=1"
|
||||
|
||||
with urllib.request.urlopen(modified_url, context=ctx) as o_url:
|
||||
sabre_data = o_url.read()
|
||||
|
||||
events = recurring_ical_events.of(Calendar.from_ical(sabre_data)).at((seeked_date.year,seeked_date.month,seeked_date.day))
|
||||
|
||||
b_copy_done = False
|
||||
for event in events:
|
||||
b_copy_done = True
|
||||
print(f"copy de l'événement: {event['SUMMARY']}")
|
||||
|
||||
|
||||
_title = event["SUMMARY"] if ("SUMMARY" in event) else "Sans Titre"
|
||||
_desc = event["DESCRIPTION"] if ("DESCRIPTION" in event) else ""
|
||||
|
||||
my_event = self.a_task.save_event(
|
||||
dtstart=event["DTSTART"].dt,
|
||||
dtend=event["DTEND"].dt,
|
||||
summary= _title,
|
||||
description= _desc,
|
||||
)
|
||||
if b_copy_done:
|
||||
self.set_today_as_synced()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Create your models here.
|
||||
class cls_caldav():
|
||||
url = ""
|
||||
@@ -60,10 +129,7 @@ class cls_caldav():
|
||||
print(f"ICS CALL URL = {modified_url}")
|
||||
with urllib.request.urlopen(modified_url, context=ctx) as o_url:
|
||||
self.data = o_url.read()
|
||||
#o_url.close()
|
||||
|
||||
print(self.data)
|
||||
|
||||
|
||||
return self.data
|
||||
|
||||
def get_caldav_data(self,periode=1,calendar=None, date=None):
|
||||
@@ -191,13 +257,17 @@ class cls_caldav_client():
|
||||
caldav_password = "Agendamc144"
|
||||
|
||||
caldav_agenda_config_url = caldav_cfg["task_config"]
|
||||
|
||||
|
||||
events = None
|
||||
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.client = caldav.DAVClient(url=self.caldav_url, username=self.caldav_user, password=self.caldav_password)
|
||||
|
||||
|
||||
|
||||
def init_caldav(self, caldav_url, caldav_user, caldav_password):
|
||||
client = caldav.DAVClient(url=caldav_url, username=caldav_user, password=caldav_password)
|
||||
self.data = self.client.principal()
|
||||
|
||||
for cal in self.data.calendars():
|
||||
@@ -212,7 +282,6 @@ class cls_caldav_client():
|
||||
self.a_rh = self.data.calendar(cal_id=caldav_id["rh"])
|
||||
self.a_manif = self.data.calendar(cal_id=caldav_id["manif"])
|
||||
|
||||
#self.get_events_by_date(self.a_team)
|
||||
def get_event_by_uuid(self,calandar,uuid):
|
||||
event = calandar.object_by_uid(uid=uuid)
|
||||
return event
|
||||
@@ -235,36 +304,9 @@ class cls_caldav_client():
|
||||
summary=event.vobject_instance.vevent.summary.value,
|
||||
description=_desc,
|
||||
)
|
||||
event.delete()
|
||||
|
||||
|
||||
|
||||
def init_task_management(self):
|
||||
print("pass copy task process")
|
||||
try:
|
||||
with open(os.path.join("mycaldav", "last_sync_config.bin"), "rb") as file:
|
||||
array = pickle.load(file)
|
||||
except:
|
||||
array = {"year": 0, "month": 0}
|
||||
print("Erreur lecture fichier config")
|
||||
|
||||
if array["month"] == datetime.now().month:
|
||||
if array["year"] == datetime.now().year:
|
||||
print("pas de copy, sync déjà fait")
|
||||
else:
|
||||
self.copy_caldav_data(self.a_task)
|
||||
else:
|
||||
self.copy_caldav_data(self.a_task)
|
||||
self.clear_all_events_by_Date(self.a_task)
|
||||
|
||||
|
||||
def add_event_in_calandar(self, calandar):
|
||||
my_event = calandar.save_event(
|
||||
dtstart=datetime(2022, 6, 30, 12),
|
||||
dtend=datetime(2022, 6, 30, 13),
|
||||
summary="Do the needful",
|
||||
|
||||
)
|
||||
event.delete()
|
||||
|
||||
|
||||
def get_events_by_date(self,calandar,date=None ):
|
||||
if date is None:
|
||||
date = datetime.now()
|
||||
@@ -330,104 +372,6 @@ class cls_caldav_client():
|
||||
|
||||
|
||||
|
||||
def copy_caldav_data(self, calandar=None):
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
|
||||
|
||||
o_url = urllib.request.urlopen(self.caldav_agenda_config_url, context=ctx)
|
||||
data = o_url.read()
|
||||
o_url.close()
|
||||
|
||||
now = f"01.{datetime.now().month}.{datetime.now().year}"
|
||||
|
||||
|
||||
next_month = datetime.strptime(f"1.{datetime.today().month}.{datetime.today().year}","%d.%m.%Y") + relativedelta(months=1)
|
||||
next_month = (next_month.year, next_month.month, next_month.day)
|
||||
|
||||
today = (datetime.now().year, datetime.now().month, 1)
|
||||
print(f"today:{today}")
|
||||
print(f"next:{next_month}")
|
||||
events = None
|
||||
events = recurring_ical_events.of(Calendar.from_ical(data)).between(today, next_month)
|
||||
cur_events = calandar.date_search(start=datetime(today[0],today[1],today[2]), end=datetime(next_month[0],next_month[1],next_month[2]), expand=True)
|
||||
|
||||
for event in events:
|
||||
print(f"copy de l'événement: {event['SUMMARY']}")
|
||||
|
||||
|
||||
_title = event["SUMMARY"] if ("SUMMARY" in event) else "Sans Titre"
|
||||
_desc = event["DESCRIPTION"] if ("DESCRIPTION" in event) else ""
|
||||
|
||||
my_event = calandar.save_event(
|
||||
dtstart=event["DTSTART"].dt,
|
||||
dtend=event["DTEND"].dt,
|
||||
summary= _title,
|
||||
description= _desc,
|
||||
)
|
||||
|
||||
with open(os.path.join("mycaldav", "last_sync_config.bin"), "wb") as file:
|
||||
array = {"year":datetime.now().year, "month":datetime.now().month}
|
||||
pickle.dump(array, file)
|
||||
|
||||
|
||||
|
||||
|
||||
def clear_all_events_by_Date(self, calandar):
|
||||
events = calandar.date_search(start=datetime(2000, 1, 1), end=datetime.now() - relativedelta(years=1), expand=True)
|
||||
for e in events:
|
||||
e.delete()
|
||||
#print(events)
|
||||
|
||||
def reformat_all_events(self,calandar):
|
||||
events = calandar.date_search(start=datetime(2023, 1, 1), end=datetime.now() + relativedelta(years=1),
|
||||
expand=True)
|
||||
|
||||
for event in events:
|
||||
summary = event.vobject_instance.vevent.summary.value
|
||||
print(f"test summary {summary}")
|
||||
if "----" in summary:
|
||||
summary = summary.replace("----", Key_separator)
|
||||
print(f"reformat: [{summary}]")
|
||||
event.vobject_instance.vevent.summary.value = summary
|
||||
event.save()
|
||||
elif "--" in summary:
|
||||
pass
|
||||
elif "-" in summary:
|
||||
summary = summary.replace("-",Key_separator)
|
||||
print(f"reformat: [{summary}]")
|
||||
event.vobject_instance.vevent.summary.value = summary
|
||||
event.save()
|
||||
|
||||
def change_utc_to_zurich_all_events(self,calandar):
|
||||
events = calandar.date_search(start=datetime(2023, 4, 1), end=datetime.now() + relativedelta(years=1),
|
||||
expand=True)
|
||||
|
||||
for event in events:
|
||||
start = event.vobject_instance.vevent.dtstart.value
|
||||
end = start + relativedelta(day=1)
|
||||
|
||||
if start.hour < 12:
|
||||
new_start = datetime(start.year, start.month, start.day,7,0,0, tzinfo = pytz.timezone("Europe/Zurich"))
|
||||
new_end = datetime(start.year, start.month, start.day, 19, 0, 0, tzinfo=pytz.timezone("Europe/Zurich"))
|
||||
elif start.hour > 12:
|
||||
new_start = datetime(start.year, start.month, start.day,19,0,0, tzinfo = pytz.timezone("Europe/Zurich"))
|
||||
new_end = datetime(start.year, start.month,start.day, 23, 59, 0, tzinfo=pytz.timezone("Europe/Zurich"))
|
||||
|
||||
event.vobject_instance.vevent.dtstart.value = new_start
|
||||
event.vobject_instance.vevent.dtend.value = new_end
|
||||
|
||||
|
||||
if new_end<new_start :
|
||||
print(f"{new_start}>{new_end}")
|
||||
print("ERROR")
|
||||
event.vobject_instance.vevent.dtend.value = new_start
|
||||
|
||||
|
||||
event.save()
|
||||
|
||||
|
||||
|
||||
class _caldav_item():
|
||||
|
@@ -13,3 +13,8 @@ for key,value in caldav_id.items():
|
||||
caldav_cfg[key] = f"https://sync.infomaniak.com/calendars/AA01593/{value}?export"
|
||||
|
||||
|
||||
CALDAV_URL = "https://sync.infomaniak.com"
|
||||
CALDAV_USER = 'AA01593' #agenda@ambulance-clerc.ch
|
||||
CALDAV_PASSWORD = "Agendamc144"
|
||||
|
||||
|
||||
|
@@ -27,6 +27,7 @@ from rest_framework import status
|
||||
from rest_framework.decorators import api_view
|
||||
|
||||
from mycaldav.models import CalDavItemSerializer, _caldav_item
|
||||
from mycaldav.settings import CALDAV_URL,CALDAV_USER,CALDAV_PASSWORD
|
||||
|
||||
import logging
|
||||
|
||||
@@ -34,7 +35,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def formatResponseArray(o_caldav, inverted=False):
|
||||
print(o_caldav.items)
|
||||
|
||||
try:
|
||||
if inverted:
|
||||
o_caldav.items.reverse()
|
||||
@@ -72,9 +73,10 @@ def view_task_caldav(request):
|
||||
o_caldav = cls_caldav(url=caldav_cfg["task"])
|
||||
o_caldav.get_caldav_data()
|
||||
|
||||
if (datetime.today().day == 1) or 2==1:
|
||||
myClient = cls_caldav_client()
|
||||
myClient.init_task_management()
|
||||
o_caldav_sync_management = caldav_sync_manager()
|
||||
o_caldav_sync_management.init_caldav(caldav_url=CALDAV_URL, caldav_user=CALDAV_USER, caldav_password=CALDAV_PASSWORD)
|
||||
o_caldav_sync_management.init_task_management()
|
||||
|
||||
|
||||
response = formatResponseArray(o_caldav)
|
||||
return Response(response["data"], status=response["status"])
|
||||
@@ -88,6 +90,7 @@ def view_task_edit_caldav(request):
|
||||
return JsonResponse({'error': 'UUID non fourni dans les données JSON'}, status=400)
|
||||
|
||||
myClient = cls_caldav_client()
|
||||
myClient.init_caldav(caldav_url=CALDAV_URL, caldav_user=CALDAV_USER, caldav_password=CALDAV_PASSWORD)
|
||||
myClient.mark_as_done_task(calandar=myClient.a_task,uuid=uuid)
|
||||
return JsonResponse({'sucsess': 'UUID marqué à done'}, status=200)
|
||||
|
||||
@@ -114,6 +117,7 @@ def view_vhc_edit_caldav(request):
|
||||
# Gérer le cas où l'UUID n'est pas fourni dans les données JSON
|
||||
return JsonResponse({'error': 'UUID non fourni dans les données JSON'}, status=400)
|
||||
myClient = cls_caldav_client()
|
||||
myClient.init_caldav(caldav_url=CALDAV_URL, caldav_user=CALDAV_USER, caldav_password=CALDAV_PASSWORD)
|
||||
myClient.mark_as_done_task(calandar=myClient.a_vhc,uuid=uuid)
|
||||
return JsonResponse({'sucsess': 'UUID marqué à done'}, status=200)
|
||||
|
||||
@@ -185,11 +189,7 @@ def view_op_caldav(request):
|
||||
@api_view(['GET'])
|
||||
def view_team_caldav(request):
|
||||
#Change timezone
|
||||
myclient = cls_caldav_client()
|
||||
#myclient.reformat_all_events(myclient.a_team)
|
||||
#myclient.change_utc_to_zurich_all_events(myclient.a_team)
|
||||
#myclient.reformat_all_events(myclient.a_task_config)
|
||||
#myclient.change_utc_to_zurich_all_events(myclient.a_task)
|
||||
|
||||
|
||||
o_caldav = cls_caldav(url=caldav_cfg["team"])
|
||||
o_caldav.get_caldav_data(periode=3)
|
||||
@@ -225,6 +225,7 @@ def view_team_pdf_caldav(request):
|
||||
start = datetime.strptime(request.GET['dt'], "%d.%m.%Y")
|
||||
|
||||
myClient = cls_caldav_client()
|
||||
myClient.init_caldav(caldav_url=CALDAV_URL, caldav_user=CALDAV_USER, caldav_password=CALDAV_PASSWORD)
|
||||
render_pdf(o_caldav, caldavClient=myClient, date=start)
|
||||
return FileResponse(open('mycaldav/export.pdf', 'rb'), as_attachment=False, content_type='application/pdf')
|
||||
|
||||
|
Reference in New Issue
Block a user