From d44671139154f5869612e9c6abee560e41f754ef Mon Sep 17 00:00:00 2001 From: Fergal Moran Date: Sun, 20 Sep 2015 19:30:16 +0100 Subject: [PATCH] Added celerybeat --- celerybeat-schedule | Bin 0 -> 12747 bytes dss/__init__.py | 3 +- dss/{celeryconf.py => celery.py} | 11 +++-- dss/celerysettings.py | 10 +++++ dss/settings.py | 1 + spa/management/commands/backup.py | 71 ++++++++++++++++++++++++++---- spa/tasks.py | 8 +++- 7 files changed, 86 insertions(+), 18 deletions(-) create mode 100644 celerybeat-schedule rename dss/{celeryconf.py => celery.py} (85%) create mode 100644 dss/celerysettings.py diff --git a/celerybeat-schedule b/celerybeat-schedule new file mode 100644 index 0000000000000000000000000000000000000000..de739906d8e9c377aa89744f1655620aa00a4b45 GIT binary patch literal 12747 zcmeI2&uSA<6vl6we`acG+r(OJtm3Ark|CiAE`_3-po_7^SvlOyT+GnvBq#SqOeK;{ zseJ_>KwOG1px{abU%-X<0`3Jt&&|x(fNrW-sGJO$?`G!a%$Z-#`M3l5@a$fR%`!HA zqi8L}`dsS`y<_ZKB(x_2M1Tko0U|&IhyW2F0z`la5CI}U1dbzti*eGV5eVWy95ex{ z1F8e61L_0v0rCOz0rCOz0rCOz0rG+W%m=2Tu4ctpSv?wDn~C&&BRTQ+n!ym?@%6h% zbUdO$9EihYe^5OfO+9>k_j$!}9*Qfm|4rj%_1~h1|IzR#&crz)Kpcp}B!OtWbpO?I zU%5_Y!k=k9dBBPMGh#s;{)_|P|56g6>fMU_E01BGq-gzh;TwMYY3NG3KeC zv-LCfVdMPk`T95ag&d`az)ZC>O5177a&u!HKWe`fA#Vl38>OF`dU@QFekXJsoR-6B zYi7f2T*sM{QL1(sPjSzy37kKut<`H%xSi0GkaArS;4!l$xH9g#ZQ=C;aT8M*7%)|B zr|b%TRd%_lxZILvH{b)OD?-l?A2_`4`5j2R9#=y7+rs!cfXup)|5PZ?Y#5h!jzp~;#?lkI_F!Sw!}JveDWadikYLpY^i)`B^E z0W(bQg1cLooZ|+9hy5a<};gbp|lHS3r;uLulcyWPN;axfQ71!&Bp3* tW~^6Xajci&Y#TQP=PX!SZDSf0TX&{-)b)Ep!ui{8UPuMY7F_6?KL8SmXfgl* literal 0 HcmV?d00001 diff --git a/dss/__init__.py b/dss/__init__.py index 9f69479..7dec5b1 100755 --- a/dss/__init__.py +++ b/dss/__init__.py @@ -1,2 +1,3 @@ +from __future__ import absolute_import -from .celeryconf import app as celery_app +from .celery import app as celery_app diff --git a/dss/celeryconf.py b/dss/celery.py similarity index 85% rename from dss/celeryconf.py rename to dss/celery.py index 746d14e..cc2aa5c 100644 --- a/dss/celeryconf.py +++ b/dss/celery.py @@ -1,16 +1,14 @@ +from __future__ import absolute_import + import os -import logging from celery import Celery -from celery.schedules import crontab -from spa import tasks - -logger = logging.getLogger('dss') # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dss.settings') from django.conf import settings + app = Celery('dss') # Using a string here means the worker will not have to @@ -18,8 +16,9 @@ app = Celery('dss') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) - +""" @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, tasks.play_pending_audio.s('hello'), name='add every 10') +""" diff --git a/dss/celerysettings.py b/dss/celerysettings.py new file mode 100644 index 0000000..44d1c83 --- /dev/null +++ b/dss/celerysettings.py @@ -0,0 +1,10 @@ +from datetime import timedelta + +CELERYBEAT_SCHEDULE = { + 'add-every-30-seconds': { + 'task': 'spa.tasks.play_pending_audio', + 'schedule': timedelta(seconds=10) + }, +} + +CELERY_TIMEZONE = 'UTC' \ No newline at end of file diff --git a/dss/settings.py b/dss/settings.py index fba4276..5455376 100755 --- a/dss/settings.py +++ b/dss/settings.py @@ -12,6 +12,7 @@ from dss.storagesettings import * from dss.paymentsettings import * from dss.logsettings import * from dss.psa import * +from dss.celerysettings import * DEVELOPMENT = DEBUG diff --git a/spa/management/commands/backup.py b/spa/management/commands/backup.py index 07af8b5..7567349 100644 --- a/spa/management/commands/backup.py +++ b/spa/management/commands/backup.py @@ -2,16 +2,55 @@ from gzip import GzipFile import subprocess from django.core.management.base import LabelCommand, CommandError from subprocess import Popen, PIPE, STDOUT +from dropbox.rest import ErrorResponse import pexpect from dss import settings import tarfile import dropbox import os, time +from dropbox.client import ChunkedUploader + +""" Monkey patch dropbox upload chunked """ + + +def __upload_chunked(self, chunk_size = 4 * 1024 * 1024): + """Uploads data from this ChunkedUploader's file_obj in chunks, until + an error occurs. Throws an exception when an error occurs, and can + be called again to resume the upload. + + Parameters + chunk_size + The number of bytes to put in each chunk. (Default 4 MB.) + """ + + while self.offset < self.target_length: + next_chunk_size = min(chunk_size, self.target_length - self.offset) + if self.last_block == None: + self.last_block = self.file_obj.read(next_chunk_size) + + try: + (self.offset, self.upload_id) = self.client.upload_chunk( + self.last_block, next_chunk_size, self.offset, self.upload_id) + self.last_block = None + except ErrorResponse as e: + # Handle the case where the server tells us our offset is wrong. + must_reraise = True + if e.status == 400: + reply = e.body + if "offset" in reply and reply['offset'] != 0 and reply['offset'] > self.offset: + self.last_block = None + self.offset = reply['offset'] + must_reraise = False + if must_reraise: + raise + +ChunkedUploader.upload_chunked = __upload_chunked + def _backup_database(): print("Creating database backup") - file_name = "{0}.sql".format(time.strftime("%Y%m%d-%H%M%S")) + file_name = "{}.sql".format(time.strftime("%Y%m%d-%H%M%S")) backup_file = os.path.join(settings.DSS_TEMP_PATH, file_name) print('Backing up {} database to {}'.format(settings.DATABASE_NAME, file_name)) @@ -25,13 +64,16 @@ def _backup_database(): child.sendline(settings.DATABASE_PASSWORD) child.expect(pexpect.EOF) - _create_backup_bundle("{0}.tar.gz".format(file_name), 'database', backup_file) + zip_name = "{0}.tar.gz".format(file_name) + archive = _create_backup_bundle(zip_name, backup_file) + _upload_to_dropbox('database', archive, zip_name) def _backup_settings(): print("Creating settings backup") - file_name = "{0}.tar.gz".format(time.strftime("%Y%m%d-%H%M%S")) - _create_backup_bundle(file_name, 'settings', settings.PROJECT_ROOT) + zip_name = "{0}.tar.gz".format(time.strftime("%Y%m%d-%H%M%S")) + tar_file = _create_backup_bundle(zip_name, settings.PROJECT_ROOT) + _upload_to_dropbox('settings', tar_file, "{}.tar.gz".format(zip_name)) def _progress_filter(tarinfo): @@ -39,14 +81,13 @@ def _progress_filter(tarinfo): return tarinfo -def _create_backup_bundle(remote_file, type, location): +def _create_backup_bundle(remote_file, location): backup_file = "{0}/{1}".format(settings.DSS_TEMP_PATH, remote_file) tar = tarfile.open(backup_file, "w:gz") tar.add(location) tar.close() - - _upload_to_dropbox(type, backup_file, remote_file) + return backup_file def _upload_to_dropbox(type, backup_file, remote_file): @@ -57,7 +98,6 @@ def _upload_to_dropbox(type, backup_file, remote_file): response = client.put_file("{0}/{1}".format(type, remote_file), f, overwrite=True) os.remove(backup_file) - print(response) except Exception as ex: print(ex) @@ -66,7 +106,20 @@ def _upload_to_dropbox(type, backup_file, remote_file): def _backup_media(): print("Creating media backup") file_name = "{0}.tar.gz".format(time.strftime("%Y%m%d-%H%M%S")) - _create_backup_bundle(file_name, 'media', settings.MEDIA_ROOT) + archive = _create_backup_bundle(file_name, settings.MEDIA_ROOT) + + size = os.path.getsize(archive) + upload_file = open(archive, 'rb') + + client = dropbox.client.DropboxClient(settings.DSS_DB_BACKUP_TOKEN) + uploader = client.get_chunked_uploader(upload_file, size) + while uploader.offset < size: + try: + upload = uploader.upload_chunked() + except Exception as e: + print("Error uploading: {0}".format(e)) + + uploader.finish('/media/{}'.format(file_name)) class Command(LabelCommand): diff --git a/spa/tasks.py b/spa/tasks.py index db53096..5a7f739 100755 --- a/spa/tasks.py +++ b/spa/tasks.py @@ -1,6 +1,7 @@ from celery.task import task import os import logging +import json import requests from core.realtime import activity @@ -67,7 +68,10 @@ def notify_subscriber(session_id, uid): @task def play_pending_audio(): - m = Mix.objects.order_by('uid').first() + m = Mix.objects.order_by('?').first() print("Playing: {}".format(m.title)) - r = requests.post('http://localhost:8888/a/play', data={'audio_file:': m.get_stream_url()}) + + data = {'audio_file': m.get_stream_url()} + headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} + r = requests.post('http://localhost:8888/a/play', data=json.dumps(data), headers=headers) print(r.text)