Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ictv/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ def update_database():
DBVersion(version=database_version)
db_version = DBVersion.select().getOne().version
if db_version < 1:
# Do upgrade stuff
print('Updating database to version %d' % 1)
column_sql = Asset.sqlmeta.getColumns()['last_update'].sqliteCreateSQL()
table = PluginChannel.sqlmeta.table
assert conn.queryOne('ALTER TABLE %s ADD %s' % (table, column_sql)) is None
db_version = 1
pass
DBVersion.select().getOne().set(version=db_version)

Expand Down
2 changes: 2 additions & 0 deletions ictv/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Asset(ICTVObject):
extension = StringCol(default=None)
file_size = BigIntCol(default=None) # File size in kilobytes
created = DateTimeCol(default=DateTimeCol.now)
last_update = DateTimeCol(default=DateTimeCol.now)
last_reference = DateTimeCol(default=DateTimeCol.now)
in_flight = BoolCol(default=False) # Is this asset being cached at the moment
is_cached = BoolCol(default=False) # Is this asset a cached asset from CacheManager
Expand Down Expand Up @@ -68,4 +69,5 @@ def on_asset_deleted(instance, kwargs):
# TODO: log message to app.log
print(e)


listen(on_asset_deleted, Asset, RowDestroyedSignal)
12 changes: 12 additions & 0 deletions ictv/storage/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ def cache_file_at_url(self, url):
CacheManager._get_lock(cache_asset_hash, blocking=True)
CacheManager._release_lock(cache_asset_hash)
asset = Asset.selectBy(plugin_channel=self.channel_id, filename=filename, is_cached=True).getOne(None)
elif datetime.datetime.now() - asset.last_update > datetime.timedelta(minutes=5):
logger.debug('Refreshing asset %d at url %s', asset.id, url)
CacheManager._get_lock(cache_asset_hash, blocking=True)
asset.last_update = datetime.datetime.now()
try:
self.download_manager.clear_pending_task_for_asset(asset.id)
self.download_manager.enqueue_asset(asset)
except (URLError, OSError):
logger.warning('Exception encountered when attempting to update file at url %s', url, exc_info=True)
CacheManager._release_lock(cache_asset_hash)
return None
CacheManager._release_lock(cache_asset_hash)
return asset

_name_to_lock = {}
Expand Down
35 changes: 28 additions & 7 deletions ictv/storage/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
# along with ICTV. If not, see <http://www.gnu.org/licenses/>.

import asyncio
from datetime import datetime
from queue import Queue

import os
import threading

import aiohttp
import magic
import werkzeug.http
from sqlobject import sqlhub

from ictv.common import get_root_path
Expand All @@ -51,7 +53,9 @@ def enqueue_asset(self, asset):
if asset.id not in self._pending_tasks:
def enqueue_post_process_asset(task):
mime_type, file_size = task.result()
self._post_process_queue.put((mime_type, file_size, asset))
asset.created = datetime.now()
if mime_type is not None and file_size is not None:
self._post_process_queue.put((mime_type, file_size, asset))

task = asyncio.run_coroutine_threadsafe(DownloadManager._cache_asset(asset.filename + (asset.extension or ''), asset.path), self._loop)
asset.in_flight = True
Expand All @@ -66,6 +70,11 @@ def get_pending_task_for_asset(self, asset_id):
""" Returns the corresponding task to the given asset. """
return self._pending_tasks[asset_id]

def clear_pending_task_for_asset(self, asset_id):
""" Resets the status of the task associated with the given asset. """
if self.has_pending_task_for_asset(asset_id):
del self._pending_tasks[asset_id]

def stop(self):
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
Expand All @@ -74,7 +83,7 @@ def stop(self):
def _run_loop(self):
sqlhub.threadConnection = SQLObjectThreadConnection.get_conn()
asyncio.set_event_loop(self._loop)
if not self._loop.is_closed():
if not self._loop.is_closed() and not self._loop.is_running():
self._loop.run_forever()
self._loop.close()

Expand All @@ -88,13 +97,25 @@ def _post_process_asset(self):

@staticmethod
async def _cache_asset(url, path):
content = await DownloadManager._get_content(url)
with open(os.path.join(get_root_path(), path), 'wb') as f:
file_path = os.path.join(get_root_path(), path)
if os.path.exists(file_path):
last_modified = werkzeug.http.http_date(os.stat(file_path).st_mtime)
else:
last_modified = None
content = await DownloadManager._get_content(url, last_modified)
if not content:
return None, None
with open(file_path, 'wb') as f:
f.write(content)
return magic.from_buffer(content, mime=True), len(content)

@staticmethod
async def _get_content(url):
async def _get_content(url, last_modified=None):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.read()
async with session.get(url, headers={'If-Modified-Since': last_modified} if last_modified else {}) as resp:
if resp.status is 304:
return None
elif resp.status is 200:
return await resp.read()
else:
return None
51 changes: 49 additions & 2 deletions ictv/tests/test_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import os
import random
import string
from datetime import date, datetime
import tempfile
import subprocess
from datetime import date, datetime, timedelta
from time import sleep

import web
from nose.tools import *
Expand All @@ -32,7 +35,7 @@
from ictv.common import get_root_path
from ictv.models.asset import Asset
from ictv.models.building import Building
from ictv.models.channel import PluginChannel, ChannelBundle
from ictv.models.channel import PluginChannel, ChannelBundle, Channel
from ictv.models.plugin import Plugin
from ictv.models.plugin_param_access_rights import PluginParamAccessRights
from ictv.models.role import UserPermissions, Role
Expand All @@ -43,6 +46,8 @@
from ictv.common.feedbacks import get_feedbacks, add_feedback, get_next_feedbacks, ImmediateFeedback
from ictv.common.json_datetime import DateTimeDecoder, DateTimeEncoder
from ictv.plugin_manager import plugin_manager
from ictv.storage.cache_manager import CacheManager
from ictv.storage.download_manager import DownloadManager
from ictv.tests import ICTVTestCase, FakePluginTestCase


Expand Down Expand Up @@ -532,3 +537,45 @@ def common_utils(self):
add_feedback(type=1, message="This is a test feedback from the unit test")
assert_true(feedbacks_has_type(type=1, feedbacks=get_next_feedbacks()))
assert_false(feedbacks_has_type(type="NOTESTINGFEEDBACK", feedbacks=get_next_feedbacks()))


class CacheManagerTest(ICTVTestCase):
def runTest_(self):
""" Tests the cache mechanism. """
dm = DownloadManager()

c = PluginChannel(name='Channel', subscription_right='public', secret='abcdef', plugin=Plugin(name='channel_plugin', activated='no'))
cm = CacheManager(c, dm)
port = 64080

with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
p = subprocess.Popen(['python3', '-m', 'http.server', '--b', '127.0.0.1', str(port)])
sleep(2)

with open(os.path.join(tmpdir, 'a.txt'), 'w') as f:
f.write('ABCDEFG')

a = cm.cache_file_at_url('http://127.0.0.1:%d/a.txt' % port)
sleep(1)

assert a.file_size == 7
a.last_update -= timedelta(minutes=10)

last_update = a.created
with open(os.path.join(tmpdir, 'a.txt'), 'w') as f: # There is no touch in Python
f.write('ABCDEFG')

a = cm.cache_file_at_url('http://127.0.0.1:%d/a.txt' % port)
sleep(1)
assert a.created > last_update
last_update = a.created

a = cm.cache_file_at_url('http://127.0.0.1:%d/a.txt' % port)
sleep(1)
assert a.created == last_update

os.unlink(os.path.join(get_root_path()), a.path)
p.kill()

dm.stop()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
description='ICTV is a simple content management system for digital signage on multiple screens.',
install_requires=['sqlobject', 'typing', 'icalendar', 'pyyaml', 'urllib3', 'web.py>=0.40.dev0',
'yamlordereddictloader', 'pyquery', 'BeautifulSoup4', 'python-magic', 'aiohttp', 'wand',
'feedparser', 'qrcode', 'selenium', 'python3-saml', 'pymediainfo'],
'feedparser', 'qrcode', 'selenium', 'python3-saml', 'pymediainfo', 'werkzeug'],
setup_requires=['pytest-runner', 'pytest-env'] if not is_running_on_macos else [],
tests_require=['pytest', 'pytest-xdist', 'pytest-cov', 'paste', 'nose', 'coverage<5'],
dependency_links=['https://github.com/formencode/formencode.git#egg=FormEncode'],
Expand Down