"""Celery tasks. """ from celery.task import PeriodicTask from celery.schedules import crontab from django.conf import settings import os from osmux import osmux class OSMFetcher(object): """ A mixin to get and store OpenStreetmap Data """ @property def bbox(self): raise NotImplementedError @property def store_name(self): raise NotImplementedError @property def tag(self): raise NotImplementedError value = '*' def run(self, **kwargs): fn = os.path.join(settings.MEDIA_ROOT, self.store_name) fn_new = os.path.join(settings.MEDIA_ROOT, 'new-%s' % self.store_name) with open(fn_new, 'wb') as output: output.write(osmux( self.tag, self.bbox[0], self.bbox[1], self.bbox[2], self.bbox[3], self.value)) os.rename(fn_new, fn) #class MateFetcher(OSMFetcher, PeriodicTask): # run_every = crontab(minute=0, hour=3) # bbox = (55.26, 46.52, 15.26, 5.22) # store_name = 'mate.xml' # tag = 'drink:club-mate'