Added celerybeat

This commit is contained in:
Fergal Moran
2015-09-20 19:30:16 +01:00
parent 261e547202
commit d446711391
7 changed files with 86 additions and 18 deletions

BIN
celerybeat-schedule Normal file

Binary file not shown.

View File

@@ -1,2 +1,3 @@
from __future__ import absolute_import
from .celeryconf import app as celery_app
from .celery import app as celery_app

View File

@@ -1,16 +1,14 @@
from __future__ import absolute_import
import os
import logging
from celery import Celery
from celery.schedules import crontab
from spa import tasks
logger = logging.getLogger('dss')
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dss.settings')
from django.conf import settings
app = Celery('dss')
# Using a string here means the worker will not have to
@@ -18,8 +16,9 @@ app = Celery('dss')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
"""
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, tasks.play_pending_audio.s('hello'), name='add every 10')
"""

10
dss/celerysettings.py Normal file
View File

@@ -0,0 +1,10 @@
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'spa.tasks.play_pending_audio',
'schedule': timedelta(seconds=10)
},
}
CELERY_TIMEZONE = 'UTC'

View File

@@ -12,6 +12,7 @@ from dss.storagesettings import *
from dss.paymentsettings import *
from dss.logsettings import *
from dss.psa import *
from dss.celerysettings import *
DEVELOPMENT = DEBUG

View File

@@ -2,16 +2,55 @@ from gzip import GzipFile
import subprocess
from django.core.management.base import LabelCommand, CommandError
from subprocess import Popen, PIPE, STDOUT
from dropbox.rest import ErrorResponse
import pexpect
from dss import settings
import tarfile
import dropbox
import os, time
from dropbox.client import ChunkedUploader
""" Monkey patch dropbox upload chunked """
def __upload_chunked(self, chunk_size = 4 * 1024 * 1024):
"""Uploads data from this ChunkedUploader's file_obj in chunks, until
an error occurs. Throws an exception when an error occurs, and can
be called again to resume the upload.
Parameters
chunk_size
The number of bytes to put in each chunk. (Default 4 MB.)
"""
while self.offset < self.target_length:
next_chunk_size = min(chunk_size, self.target_length - self.offset)
if self.last_block == None:
self.last_block = self.file_obj.read(next_chunk_size)
try:
(self.offset, self.upload_id) = self.client.upload_chunk(
self.last_block, next_chunk_size, self.offset, self.upload_id)
self.last_block = None
except ErrorResponse as e:
# Handle the case where the server tells us our offset is wrong.
must_reraise = True
if e.status == 400:
reply = e.body
if "offset" in reply and reply['offset'] != 0 and reply['offset'] > self.offset:
self.last_block = None
self.offset = reply['offset']
must_reraise = False
if must_reraise:
raise
ChunkedUploader.upload_chunked = __upload_chunked
def _backup_database():
print("Creating database backup")
file_name = "{0}.sql".format(time.strftime("%Y%m%d-%H%M%S"))
file_name = "{}.sql".format(time.strftime("%Y%m%d-%H%M%S"))
backup_file = os.path.join(settings.DSS_TEMP_PATH, file_name)
print('Backing up {} database to {}'.format(settings.DATABASE_NAME, file_name))
@@ -25,13 +64,16 @@ def _backup_database():
child.sendline(settings.DATABASE_PASSWORD)
child.expect(pexpect.EOF)
_create_backup_bundle("{0}.tar.gz".format(file_name), 'database', backup_file)
zip_name = "{0}.tar.gz".format(file_name)
archive = _create_backup_bundle(zip_name, backup_file)
_upload_to_dropbox('database', archive, zip_name)
def _backup_settings():
print("Creating settings backup")
file_name = "{0}.tar.gz".format(time.strftime("%Y%m%d-%H%M%S"))
_create_backup_bundle(file_name, 'settings', settings.PROJECT_ROOT)
zip_name = "{0}.tar.gz".format(time.strftime("%Y%m%d-%H%M%S"))
tar_file = _create_backup_bundle(zip_name, settings.PROJECT_ROOT)
_upload_to_dropbox('settings', tar_file, "{}.tar.gz".format(zip_name))
def _progress_filter(tarinfo):
@@ -39,14 +81,13 @@ def _progress_filter(tarinfo):
return tarinfo
def _create_backup_bundle(remote_file, type, location):
def _create_backup_bundle(remote_file, location):
backup_file = "{0}/{1}".format(settings.DSS_TEMP_PATH, remote_file)
tar = tarfile.open(backup_file, "w:gz")
tar.add(location)
tar.close()
_upload_to_dropbox(type, backup_file, remote_file)
return backup_file
def _upload_to_dropbox(type, backup_file, remote_file):
@@ -57,7 +98,6 @@ def _upload_to_dropbox(type, backup_file, remote_file):
response = client.put_file("{0}/{1}".format(type, remote_file), f, overwrite=True)
os.remove(backup_file)
print(response)
except Exception as ex:
print(ex)
@@ -66,7 +106,20 @@ def _upload_to_dropbox(type, backup_file, remote_file):
def _backup_media():
print("Creating media backup")
file_name = "{0}.tar.gz".format(time.strftime("%Y%m%d-%H%M%S"))
_create_backup_bundle(file_name, 'media', settings.MEDIA_ROOT)
archive = _create_backup_bundle(file_name, settings.MEDIA_ROOT)
size = os.path.getsize(archive)
upload_file = open(archive, 'rb')
client = dropbox.client.DropboxClient(settings.DSS_DB_BACKUP_TOKEN)
uploader = client.get_chunked_uploader(upload_file, size)
while uploader.offset < size:
try:
upload = uploader.upload_chunked()
except Exception as e:
print("Error uploading: {0}".format(e))
uploader.finish('/media/{}'.format(file_name))
class Command(LabelCommand):

View File

@@ -1,6 +1,7 @@
from celery.task import task
import os
import logging
import json
import requests
from core.realtime import activity
@@ -67,7 +68,10 @@ def notify_subscriber(session_id, uid):
@task
def play_pending_audio():
m = Mix.objects.order_by('uid').first()
m = Mix.objects.order_by('?').first()
print("Playing: {}".format(m.title))
r = requests.post('http://localhost:8888/a/play', data={'audio_file:': m.get_stream_url()})
data = {'audio_file': m.get_stream_url()}
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post('http://localhost:8888/a/play', data=json.dumps(data), headers=headers)
print(r.text)