diff --git a/ictv/database.py b/ictv/database.py index 955f0a0..dec1fe0 100644 --- a/ictv/database.py +++ b/ictv/database.py @@ -109,7 +109,11 @@ def update_database(): DBVersion(version=database_version) db_version = DBVersion.select().getOne().version if db_version < 1: - # Do upgrade stuff + print('Updating database to version %d' % 1) + column_sql = Asset.sqlmeta.getColumns()['last_update'].sqliteCreateSQL() + table = PluginChannel.sqlmeta.table + assert conn.queryOne('ALTER TABLE %s ADD %s' % (table, column_sql)) is None + db_version = 1 pass DBVersion.select().getOne().set(version=db_version) diff --git a/ictv/models/asset.py b/ictv/models/asset.py index c04a3f7..fca8f58 100644 --- a/ictv/models/asset.py +++ b/ictv/models/asset.py @@ -37,6 +37,7 @@ class Asset(ICTVObject): extension = StringCol(default=None) file_size = BigIntCol(default=None) # File size in kilobytes created = DateTimeCol(default=DateTimeCol.now) + last_update = DateTimeCol(default=DateTimeCol.now) last_reference = DateTimeCol(default=DateTimeCol.now) in_flight = BoolCol(default=False) # Is this asset being cached at the moment is_cached = BoolCol(default=False) # Is this asset a cached asset from CacheManager @@ -68,4 +69,5 @@ def on_asset_deleted(instance, kwargs): # TODO: log message to app.log print(e) + listen(on_asset_deleted, Asset, RowDestroyedSignal) diff --git a/ictv/storage/cache_manager.py b/ictv/storage/cache_manager.py index 0def84d..ff495e7 100644 --- a/ictv/storage/cache_manager.py +++ b/ictv/storage/cache_manager.py @@ -77,6 +77,18 @@ def cache_file_at_url(self, url): CacheManager._get_lock(cache_asset_hash, blocking=True) CacheManager._release_lock(cache_asset_hash) asset = Asset.selectBy(plugin_channel=self.channel_id, filename=filename, is_cached=True).getOne(None) + elif datetime.datetime.now() - asset.last_update > datetime.timedelta(minutes=5): + logger.debug('Refreshing asset %d at url %s', asset.id, url) + CacheManager._get_lock(cache_asset_hash, blocking=True) + asset.last_update = datetime.datetime.now() + try: + self.download_manager.clear_pending_task_for_asset(asset.id) + self.download_manager.enqueue_asset(asset) + except (URLError, OSError): + logger.warning('Exception encountered when attempting to update file at url %s', url, exc_info=True) + CacheManager._release_lock(cache_asset_hash) + return None + CacheManager._release_lock(cache_asset_hash) return asset _name_to_lock = {} diff --git a/ictv/storage/download_manager.py b/ictv/storage/download_manager.py index a319ce4..2b6f6fe 100644 --- a/ictv/storage/download_manager.py +++ b/ictv/storage/download_manager.py @@ -20,6 +20,7 @@ # along with ICTV. If not, see . import asyncio +from datetime import datetime from queue import Queue import os @@ -27,6 +28,7 @@ import aiohttp import magic +import werkzeug.http from sqlobject import sqlhub from ictv.common import get_root_path @@ -51,7 +53,9 @@ def enqueue_asset(self, asset): if asset.id not in self._pending_tasks: def enqueue_post_process_asset(task): mime_type, file_size = task.result() - self._post_process_queue.put((mime_type, file_size, asset)) + asset.created = datetime.now() + if mime_type is not None and file_size is not None: + self._post_process_queue.put((mime_type, file_size, asset)) task = asyncio.run_coroutine_threadsafe(DownloadManager._cache_asset(asset.filename + (asset.extension or ''), asset.path), self._loop) asset.in_flight = True @@ -66,6 +70,11 @@ def get_pending_task_for_asset(self, asset_id): """ Returns the corresponding task to the given asset. """ return self._pending_tasks[asset_id] + def clear_pending_task_for_asset(self, asset_id): + """ Resets the status of the task associated with the given asset. """ + if self.has_pending_task_for_asset(asset_id): + del self._pending_tasks[asset_id] + def stop(self): if self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) @@ -74,7 +83,7 @@ def stop(self): def _run_loop(self): sqlhub.threadConnection = SQLObjectThreadConnection.get_conn() asyncio.set_event_loop(self._loop) - if not self._loop.is_closed(): + if not self._loop.is_closed() and not self._loop.is_running(): self._loop.run_forever() self._loop.close() @@ -88,13 +97,25 @@ def _post_process_asset(self): @staticmethod async def _cache_asset(url, path): - content = await DownloadManager._get_content(url) - with open(os.path.join(get_root_path(), path), 'wb') as f: + file_path = os.path.join(get_root_path(), path) + if os.path.exists(file_path): + last_modified = werkzeug.http.http_date(os.stat(file_path).st_mtime) + else: + last_modified = None + content = await DownloadManager._get_content(url, last_modified) + if not content: + return None, None + with open(file_path, 'wb') as f: f.write(content) return magic.from_buffer(content, mime=True), len(content) @staticmethod - async def _get_content(url): + async def _get_content(url, last_modified=None): async with aiohttp.ClientSession() as session: - async with session.get(url) as resp: - return await resp.read() + async with session.get(url, headers={'If-Modified-Since': last_modified} if last_modified else {}) as resp: + if resp.status is 304: + return None + elif resp.status is 200: + return await resp.read() + else: + return None diff --git a/ictv/tests/test_code.py b/ictv/tests/test_code.py index 4dea7a5..8dc2ddb 100644 --- a/ictv/tests/test_code.py +++ b/ictv/tests/test_code.py @@ -22,7 +22,10 @@ import os import random import string -from datetime import date, datetime +import tempfile +import subprocess +from datetime import date, datetime, timedelta +from time import sleep import web from nose.tools import * @@ -32,7 +35,7 @@ from ictv.common import get_root_path from ictv.models.asset import Asset from ictv.models.building import Building -from ictv.models.channel import PluginChannel, ChannelBundle +from ictv.models.channel import PluginChannel, ChannelBundle, Channel from ictv.models.plugin import Plugin from ictv.models.plugin_param_access_rights import PluginParamAccessRights from ictv.models.role import UserPermissions, Role @@ -43,6 +46,8 @@ from ictv.common.feedbacks import get_feedbacks, add_feedback, get_next_feedbacks, ImmediateFeedback from ictv.common.json_datetime import DateTimeDecoder, DateTimeEncoder from ictv.plugin_manager import plugin_manager +from ictv.storage.cache_manager import CacheManager +from ictv.storage.download_manager import DownloadManager from ictv.tests import ICTVTestCase, FakePluginTestCase @@ -532,3 +537,45 @@ def common_utils(self): add_feedback(type=1, message="This is a test feedback from the unit test") assert_true(feedbacks_has_type(type=1, feedbacks=get_next_feedbacks())) assert_false(feedbacks_has_type(type="NOTESTINGFEEDBACK", feedbacks=get_next_feedbacks())) + + +class CacheManagerTest(ICTVTestCase): + def runTest_(self): + """ Tests the cache mechanism. """ + dm = DownloadManager() + + c = PluginChannel(name='Channel', subscription_right='public', secret='abcdef', plugin=Plugin(name='channel_plugin', activated='no')) + cm = CacheManager(c, dm) + port = 64080 + + with tempfile.TemporaryDirectory() as tmpdir: + os.chdir(tmpdir) + p = subprocess.Popen(['python3', '-m', 'http.server', '--b', '127.0.0.1', str(port)]) + sleep(2) + + with open(os.path.join(tmpdir, 'a.txt'), 'w') as f: + f.write('ABCDEFG') + + a = cm.cache_file_at_url('http://127.0.0.1:%d/a.txt' % port) + sleep(1) + + assert a.file_size == 7 + a.last_update -= timedelta(minutes=10) + + last_update = a.created + with open(os.path.join(tmpdir, 'a.txt'), 'w') as f: # There is no touch in Python + f.write('ABCDEFG') + + a = cm.cache_file_at_url('http://127.0.0.1:%d/a.txt' % port) + sleep(1) + assert a.created > last_update + last_update = a.created + + a = cm.cache_file_at_url('http://127.0.0.1:%d/a.txt' % port) + sleep(1) + assert a.created == last_update + + os.unlink(os.path.join(get_root_path()), a.path) + p.kill() + + dm.stop() diff --git a/setup.py b/setup.py index 2de0792..116be9e 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ description='ICTV is a simple content management system for digital signage on multiple screens.', install_requires=['sqlobject', 'typing', 'icalendar', 'pyyaml', 'urllib3', 'web.py>=0.40.dev0', 'yamlordereddictloader', 'pyquery', 'BeautifulSoup4', 'python-magic', 'aiohttp', 'wand', - 'feedparser', 'qrcode', 'selenium', 'python3-saml', 'pymediainfo'], + 'feedparser', 'qrcode', 'selenium', 'python3-saml', 'pymediainfo', 'werkzeug'], setup_requires=['pytest-runner', 'pytest-env'] if not is_running_on_macos else [], tests_require=['pytest', 'pytest-xdist', 'pytest-cov', 'paste', 'nose', 'coverage<5'], dependency_links=['https://github.com/formencode/formencode.git#egg=FormEncode'],