Files
Reskreen/mycaldav/models.py
Kirosbr d927b69e7d
All checks were successful
Build and Push Docker Image / build (push) Successful in 3m50s
debug cron
2024-12-04 11:18:36 +01:00

455 lines
16 KiB
Python

from django.db import models
from datetime import datetime, timedelta, time
from django.contrib.auth.models import User
import datetime as Datetime
import pickle
import pytz
import os
from dateutil.relativedelta import relativedelta
from django.db import models
import urllib.request
import ssl
from icalendar import Calendar, Event
import recurring_ical_events
import caldav
from mycaldav.settings import *
import pytz
from dateutil.parser import parse
Key_separator = "--"
class caldav_sync_manager(models.Model):
dtDate = models.DateField('Date_synchronized', unique=True)
bDone = models.BooleanField("Effectuée", default=False)
dtUpdated = models.DateTimeField('date updated', auto_now=True)
dtCreated = models.DateTimeField('date published', auto_now_add=True)
def __str__(self):
return f"{self.dtDate}"
def init_caldav(self, caldav_url, caldav_user, caldav_password):
client = caldav.DAVClient(url=caldav_url, username=caldav_user, password=caldav_password)
data = client.principal()
self.a_task = data.calendar(cal_id=caldav_id["task"])
def set_today_as_synced(self, date):
o_new_manager = caldav_sync_manager()
o_new_manager.dtDate = date
o_new_manager.save()
def init_task_management(self):
today = datetime.now()
o_caldav_sync_management = caldav_sync_manager.objects.filter(dtDate=today.date())
if not o_caldav_sync_management.exists():
self.copy_caldav_data(today)
print(f"synced events: {today}")
else:
print("Aujourd'hui: pas de copy, sync déjà fait")
last_month = today - relativedelta(months=1)
print(f"test du mois précédant: {last_month.strftime("%d/%m/%Y")}")
o_caldav_sync_management = caldav_sync_manager.objects.filter(dtDate=last_month.date())
if not o_caldav_sync_management.exists():
self.copy_caldav_data(last_month)
print(f"synced events: {last_month}")
else:
print("Mois précédant: pas de copy, sync déjà fait")
def copy_caldav_data(self, today):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
seeked_date = today + relativedelta(months=1)
sync_date = today + relativedelta(months=1, days=1)
modified_url = caldav_cfg["task_config"] + f"&start={int(today.timestamp())}&end={int(sync_date.timestamp())}&expand=1"
with urllib.request.urlopen(modified_url, context=ctx) as o_url:
sabre_data = o_url.read()
events = recurring_ical_events.of(Calendar.from_ical(sabre_data)).at((seeked_date.year,seeked_date.month,seeked_date.day))
#print(f"events:{events}")
for event in events:
print(f"copy de l'événement: {event['SUMMARY']}")
_title = event["SUMMARY"] if ("SUMMARY" in event) else "Sans Titre"
_desc = event["DESCRIPTION"] if ("DESCRIPTION" in event) else ""
my_event = self.a_task.save_event(
dtstart=event["DTSTART"].dt,
dtend=event["DTEND"].dt,
summary= _title,
description= _desc,
)
self.set_today_as_synced(today)
# Create your models here.
class cls_caldav():
url = ""
data = None
items = []
day = []
night = []
def __init__(self, url=""):
self.url = url
def clear_data(self):
self.data = None
self.items = []
self.day = []
self.night = []
#Trie les tableau par odre croissant sur la date de début de l'événement
def sort_array(self):
self.items.sort(key=lambda x: x.dtstamp, reverse=False)
def sort_array_by_key(self,reverse=False):
self.items.sort(key=lambda x: x.key, reverse=reverse)
self.day.sort(key=lambda x: x.key, reverse=reverse)
self.night.sort(key=lambda x: x.key, reverse=reverse)
def caldav_open_url(self, days_delta, dest_date):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.clear_data()
if not isinstance(dest_date, datetime):
dest_date = datetime.strptime(dest_date, '%y-%m-%d')
modified_url = self.url + f"&start={int((dest_date-timedelta(days=days_delta)).timestamp())}&end={int((dest_date +timedelta(days=days_delta)).timestamp())}&expand=1"
print(f"ICS CALL URL = {modified_url}")
with urllib.request.urlopen(modified_url, context=ctx) as o_url:
self.data = o_url.read()
return self.data
def get_caldav_data(self,periode=1,calendar=None, date=None):
if date is None:
date = datetime.now()
self.caldav_open_url(days_delta=periode,dest_date=date.strftime("%y-%m-%d"))
today = (date.year,date.month,date.day)
events = None
if periode == 1:
events = recurring_ical_events.of(Calendar.from_ical(self.data)).at(today)
elif periode == 2:
tomorow = date + timedelta(days=1)
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow)
elif periode == 3:
today = date + timedelta(days=-2)
tomorow = date + timedelta(days=2)
print(f"affichage: {today} <> {tomorow}")
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow)
elif periode == 7:
endweek = date + timedelta(days=7)
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,endweek)
self.parse_data(events)
def convert_to_gmt1(self, dt):
gmt1_tz = pytz.timezone('Europe/Paris')
try:
if hasattr(dt,"tzinfo") and dt.tzinfo is not None:
# Convertir l'objet datetime en GMT+1
# Utilisez le nom de votre fuseau horaire GMT+1
dt_gmt1 = dt.astimezone(gmt1_tz)
else:
# L'objet datetime est naïf, ajouter l'information de fuseau horaire GMT+1
utc_tz = pytz.utc
dt_utc = utc_tz.localize(dt)
dt_gmt1 = dt_utc.astimezone(gmt1_tz)
return dt_gmt1
except:
return dt
def parse_data(self, events):
desired_timezone = pytz.timezone('Europe/Paris')
for event in events:
item = _caldav_item()
item.name = event["SUMMARY"]
item.uuid = f"{event['UID']}"
if Key_separator in item.name:
arr = item.name.split(Key_separator)
item.key = arr[0]
if "$" in item.key:
item.team_transfert = True
item.key = item.key.replace("$","")
if "Manif" in item.key:
item.team_manif = True
item.key = item.key.replace("Manif", "")
item.name = arr[1]
if "&" in item.name:
arr = item.name.split("&")
item.team_1 = arr[0]
if "#" in item.team_1:
item.team_1_chef = True
item.team_1 = item.team_1.replace("#","")
item.team_2 = arr[1]
if "#" in item.team_2:
item.team_2_chef = True
item.team_2 = item.team_2.replace("#", "")
if "DESCRIPTION" in event.keys():
item.desc = f"{event['DESCRIPTION']}"
if "#Fait" in item.desc:
item.done = True
item.desc = item.desc.replace("#Fait", "")
if '{href=' in item.desc:
temp_str = item.desc.split('{href=')[1]
temp_str = temp_str.split('}')[0]
item.href = temp_str
item.desc = item.desc.replace("{href=" + item.href + "}","")
desired_timezone = pytz.timezone('Europe/Paris')
datetime_obj = parse(event["DTSTART"].dt.strftime("%d.%m.%Y %H:%M"))
gmt1_datetime = datetime_obj.astimezone(desired_timezone)
print(f"gmt= {self.convert_to_gmt1( event['DTSTART'].dt)}")
item.dtstart = self.convert_to_gmt1( event['DTSTART'].dt).strftime("%d.%m.%Y %H:%M")
item.dtstamp = int(self.convert_to_gmt1( event['DTSTART'].dt).strftime("%Y%m%d%H%M"))
#print(item.dtstamp)
item.dtend = self.convert_to_gmt1( event['DTEND'].dt).strftime("%d.%m.%Y %H:%M")
item.format_str_date()
self.items.append(item)
start = datetime.strptime(item.dtstart,"%d.%m.%Y %H:%M")
if type(start) is Datetime.date:
start = datetime.combine(start, datetime.min.time())
print("convert Date to datetime")
print(f"Parse Data {item.name} start:{start}")
if int(start.strftime("%H")) < 19:
self.day.append(item)
if int(start.strftime("%H")) >= 19:
print("add to night ****")
self.night.append(item)
self.sort_array()
return self.data
class cls_caldav_client():
caldav_url = "https://sync.infomaniak.com"
#caldav_user = 'SC01066' #ambulanciers@ambulance-clerc.ch
#caldav_password = "mc144*1870CLERC"
caldav_user = 'AA01593' #agenda@ambulance-clerc.ch
caldav_password = "Agendamc144"
caldav_agenda_config_url = caldav_cfg["task_config"]
events = None
def init_caldav(self, caldav_url, caldav_user, caldav_password):
client = caldav.DAVClient(url=caldav_url, username=caldav_user, password=caldav_password)
self.data = client.principal()
for cal in self.data.calendars():
print(f"name: {cal.name}({cal.url})")
self.a_task = self.data.calendar(cal_id=caldav_id["task"])
self.a_team = self.data.calendar(cal_id=caldav_id["team"])
self.a_vhc = self.data.calendar(cal_id=caldav_id["vhc"])
self.a_op = self.data.calendar(cal_id=caldav_id["op"])
self.a_task_config = self.data.calendar(cal_id=caldav_id["task_config"])
self.a_road = self.data.calendar(cal_id=caldav_id["road"])
self.a_rh = self.data.calendar(cal_id=caldav_id["rh"])
self.a_manif = self.data.calendar(cal_id=caldav_id["manif"])
def get_event_by_uuid(self,calandar,uuid):
event = calandar.object_by_uid(uid=uuid)
return event
#self.get_events_by_date(calandar)
#for event in self.events:
# print(event.vobject_instance.vevent.eventId)
def mark_as_done_task(self,calandar,uuid):
event = self.get_event_by_uuid(calandar=calandar,uuid=uuid)
try:
_desc = event.vobject_instance.vevent.description.value + "#Fait"
_desc.replace("#Fait#Fait","#Fait")
except AttributeError:
_desc = "#Fait"
my_event = calandar.save_event(
dtstart=event.vobject_instance.vevent.dtstart.value,
dtend=event.vobject_instance.vevent.dtend.value,
summary=event.vobject_instance.vevent.summary.value,
description=_desc,
)
event.delete()
def get_events_by_date(self,calandar,date=None ):
if date is None:
date = datetime.now()
tomorow = date + timedelta(days=7)
self.events = calandar.date_search(
start=datetime(date.year, date.month, date.day), end=datetime(tomorow.year, tomorow.month, tomorow.day), expand=False
)
for event in self.events:
self.format_data_event(event)
return
def get_lastName_with_MC(self,search):
print(f"search équipage 1: {search}")
user = User.objects.get(username=f"mc{search}@clerc.ch")
last_name = f"{user.last_name} {user.first_name[0]}."
return last_name
def format_data_event(self,event):
summary = event.vobject_instance.vevent.summary.value
if Key_separator not in summary:
return False
key = summary.split(Key_separator)
if "&" not in key[1]:
return False
equipage = key[1].split("&")
team_1 = ""
team_2 = ""
updated = False
print(f"{event.vobject_instance.vevent.dtstart.value} check équipage({key[0]}) {key[1]}")
if len(equipage[0].strip()) <= 5 : # check équipage 1
equipage[0] = equipage[0].strip().lower().replace("mc", "")
team_1 = equipage[0].strip().replace('#', '')
try:
last_name = self.get_lastName_with_MC(team_1)
equipage[0] = equipage[0].replace(team_1, f"MC{team_1} {last_name}")
updated = True
except :
print(f"Error: mc{team_1}@clerc.ch @{summary}")
return None
else:
print(f"T1 Already with MC { len(equipage[0].strip())}[{equipage[0].strip()}]")
if len(equipage) < 2:
print(f"Error équipage sans [&] {key[1]}")
elif len(equipage[1].strip()) <= 5: # check équipage 2
equipage[1] = equipage[1].strip().lower().replace("mc","")
team_2 = equipage[1].strip().replace('#','')
try:
last_name = self.get_lastName_with_MC(team_2)
equipage[1] = equipage[1].replace(team_2, f"MC{team_2} {last_name}")
updated = True
except :
print(f"Error: mc{team_2}@clerc.ch @{summary}")
return None
else:
print(f"T2 Already with MC {len(equipage[1].strip())}[{equipage[1].strip()}]")
if updated:
event.vobject_instance.vevent.summary.value = f"{key[0]} {Key_separator} {equipage[0].strip()} & {equipage[1].strip()}"
event.save()
class _caldav_item():
def __init__(self, key="", name="", desc="", dtstart=None, str_start_date="",
str_start_time="", dtend=None, str_end_date="", str_end_time="",
dtstamp=None, done=False, href="", uuid="", team_1="",
team_2="", team_1_chef=False, team_2_chef=False,
team_transfert=False, team_manif=False):
self.key = key
self.name = name
self.desc = desc
self.dtstart = dtstart
self.str_start_date = str_start_date
self.str_start_time = str_start_time
self.dtend = dtend
self.str_end_date = str_end_date
self.str_end_time = str_end_time
self.dtstamp = dtstamp
self.done = done
self.href = href
self.uuid = uuid
self.team_1 = team_1
self.team_2 = team_2
self.team_1_chef = team_1_chef
self.team_2_chef = team_2_chef
self.team_transfert = team_transfert
self.team_manif = team_manif
def format_str_date(self):
self.str_start_date = datetime.strptime(self.dtstart,"%d.%m.%Y %H:%M").strftime("%d.%m")
self.str_start_time = datetime.strptime(self.dtstart, "%d.%m.%Y %H:%M").strftime("%H:%M")
self.str_end_date = datetime.strptime(self.dtend,"%d.%m.%Y %H:%M").strftime("%d.%m")
self.str_end_time = datetime.strptime(self.dtend, "%d.%m.%Y %H:%M").strftime("%H:%M")
from rest_framework import serializers
class CalDavItemSerializer(serializers.Serializer):
key = serializers.CharField()
name = serializers.CharField()
desc = serializers.CharField(allow_blank=True)
dtstart = serializers.CharField()
str_start_date = serializers.CharField()
str_start_time = serializers.CharField()
dtend = serializers.CharField()
str_end_date = serializers.CharField()
str_end_time = serializers.CharField()
done = serializers.BooleanField()
href = serializers.CharField(allow_blank=True)
uuid = serializers.CharField()
team_1 = serializers.CharField(allow_blank=True)
team_2 = serializers.CharField(allow_blank=True)
team_1_chef = serializers.BooleanField()
team_2_chef = serializers.BooleanField()
team_transfert = serializers.BooleanField()
team_manif = serializers.BooleanField()