You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
337 lines
13 KiB
Python
337 lines
13 KiB
Python
# import jnius_config
|
|
# jnius_config.add_options('-Xmx256m')
|
|
from apiproxy import ApiProxy
|
|
from logger import AmbarLogger
|
|
from parsers.fileparser import FileParser
|
|
from parsers.contenttypeanalyzer import ContentTypeAnalyzer
|
|
from contentprocessors.autotagger import AutoTagger
|
|
from model import AmbarFileContent, AmbarFileMeta
|
|
from containerprocessors.archiveprocessor import ArchiveProcessor
|
|
from containerprocessors.pstprocessor import PstProcessor
|
|
from datetime import datetime
|
|
import gc
|
|
import io
|
|
import sys
|
|
import os
|
|
import time
|
|
import hashlib
|
|
import pika
|
|
import json
|
|
import base64
|
|
from hashlib import sha256
|
|
|
|
RABBIT_QUEUE_NAME = 'AMBAR_PIPELINE_QUEUE'
|
|
RABBIT_HEARTBEAT = 0
|
|
API_CALL_TIMEOUT_SECONDS = 1200
|
|
PARSE_TIMEOUT_SECONDS = 1200
|
|
|
|
pipelineId = os.getenv('id', '0')
|
|
apiUrl = os.getenv('api_url', 'http://serviceapi:8081')
|
|
webApiUrl = os.getenv('web_api_url', 'http://webapi:8080')
|
|
rabbitHost = os.getenv('rabbit_host','amqp://ambar')
|
|
|
|
ocrPdfSymbolsPerPageThreshold = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 1000))
|
|
ocrPdfMaxPageCount = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 5))
|
|
preserveOriginals = True if os.getenv('preserveOriginals', 'False') == 'True' else False
|
|
|
|
# instantiating Api proxy
|
|
apiProxy = ApiProxy(apiUrl, webApiUrl, API_CALL_TIMEOUT_SECONDS)
|
|
# instantiating logger
|
|
logger = AmbarLogger(apiProxy, pipelineId)
|
|
# instantiating ArchiveProcessor
|
|
archiveProcessor = ArchiveProcessor(logger, apiProxy)
|
|
# instantiating PstProcessor
|
|
pstProcessor = PstProcessor(logger, apiProxy)
|
|
# instantiating Parser
|
|
fileParser = FileParser(logger, PARSE_TIMEOUT_SECONDS, ocrPdfSymbolsPerPageThreshold, ocrPdfMaxPageCount)
|
|
# instantiating AutoTagger
|
|
autoTagger = AutoTagger(logger, apiProxy)
|
|
# checking whether to preserve originals or not
|
|
preserveOriginals = True if preserveOriginals else False
|
|
|
|
# reporting start
|
|
logger.LogMessage('info', 'started')
|
|
|
|
# connecting to Rabbit
|
|
logger.LogMessage('info', 'connecting to Rabbit {0}...'.format(rabbitHost))
|
|
|
|
try:
|
|
rabbitConnection = pika.BlockingConnection(pika.URLParameters(
|
|
'{0}?heartbeat={1}'.format(rabbitHost, RABBIT_HEARTBEAT)))
|
|
rabbitChannel = rabbitConnection.channel()
|
|
rabbitChannel.basic_qos(prefetch_count=1, all_channels=True)
|
|
logger.LogMessage('info', 'connected to Rabbit!')
|
|
except Exception as e:
|
|
logger.LogMessage('error', 'error initializing connection to Rabbit {0}'.format(repr(e)))
|
|
exit(1)
|
|
|
|
# starting pipeline
|
|
logger.LogMessage('info', 'waiting for messages...')
|
|
|
|
|
|
def ProcessFile(message):
|
|
try:
|
|
meta = message['meta']
|
|
event = message['event']
|
|
sha = None
|
|
|
|
logger.LogMessage('verbose', '{0} task received for {1}'.format(event, meta['full_name']))
|
|
|
|
if ('sha' in message):
|
|
sha = message['sha']
|
|
|
|
fileId = sha256('{0}{1}'.format(meta['source_id'],meta['full_name']).encode('utf-8')).hexdigest()
|
|
|
|
if (event == 'unlink'):
|
|
apiResp = apiProxy.HideFile(fileId)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error hidding file for {0} {1}'.format(meta['full_name'], apiResp.message))
|
|
return False
|
|
|
|
if apiResp.Ok:
|
|
logger.LogMessage('verbose', 'removed {0}'.format(meta['full_name']))
|
|
return True
|
|
|
|
if not apiResp.NotFound:
|
|
logger.LogMessage('error', 'error hidding file {0} {1} code: {2}'.format(meta['full_name'], apiResp.message, apiResp.code))
|
|
return False
|
|
|
|
return True
|
|
|
|
if (event != 'add' and event != 'change'):
|
|
print('Ignoring {0}'.format(event))
|
|
return True
|
|
|
|
apiResp = apiProxy.CheckIfMetaExists(meta)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error checking meta existance for {0} {1}'.format(meta['full_name'], apiResp.message))
|
|
return False
|
|
|
|
if apiResp.Ok:
|
|
logger.LogMessage('verbose', 'meta found for {0}'.format(meta['full_name']))
|
|
return True
|
|
|
|
if not apiResp.NotFound:
|
|
logger.LogMessage('error', 'error checking meta existance for {0} {1} {2}'.format(meta['full_name'], apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
apiResp = apiProxy.UnhideFile(fileId)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error unhiding file {0} {1}'.format(meta['full_name'], apiResp.message))
|
|
return False
|
|
|
|
if not (apiResp.Ok or apiResp.NotFound):
|
|
logger.LogMessage('error', 'error unhiding file, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
fileMeta = AmbarFileMeta.Init(meta)
|
|
|
|
if not fileMeta.initialized:
|
|
logger.LogMessage('error', 'error initializing file meta {0}'.format(fileMeta.message))
|
|
return False
|
|
|
|
if (sha):
|
|
apiResp = apiProxy.DownloadFileBySha(sha)
|
|
else:
|
|
apiResp = apiProxy.DownloadFile(fileMeta.full_name)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error downloading file {0} {1}'.format(fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not apiResp.Ok:
|
|
logger.LogMessage('error', 'error downloading file {0} {1} code: {2}'.format(fileMeta.full_name, apiResp.message, apiResp.code))
|
|
return False
|
|
|
|
fileData = apiResp.payload
|
|
|
|
sha = sha256(fileData).hexdigest()
|
|
|
|
hasParsedContent = False
|
|
fileContent = {}
|
|
|
|
apiResp = apiProxy.GetParsedFileContentFields(sha)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error retrieving parsed file content fields {0} {1}'.format(
|
|
fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not (apiResp.Ok or apiResp.NotFound):
|
|
logger.LogMessage('error', 'error retrieving parsed file content fields {0} {1} {2}'.format(
|
|
fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
if apiResp.Ok:
|
|
hasParsedContent = True
|
|
fileContent = apiResp.payload
|
|
|
|
if hasParsedContent:
|
|
apiResp = apiProxy.GetParsedFileContent(sha)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error retrieving parsed file content {0} {1}'.format(
|
|
fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not (apiResp.Ok or apiResp.NotFound):
|
|
logger.LogMessage('error', 'error retrieving parsed file content {0} {1} {2}'.format(
|
|
fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
if apiResp.NotFound:
|
|
hasParsedContent = False
|
|
|
|
if apiResp.Ok:
|
|
hasParsedContent = True
|
|
fileContent['text'] = apiResp.payload.decode('utf-8', 'ignore')
|
|
logger.LogMessage(
|
|
'verbose', 'parsed content found {0}'.format(fileMeta.full_name))
|
|
|
|
if not hasParsedContent:
|
|
# checking if file is archive
|
|
if ContentTypeAnalyzer.IsArchive(fileMeta.short_name):
|
|
archiveProcessor.Process(fileData, fileMeta, fileMeta.source_id)
|
|
|
|
# checking if file is pst
|
|
if ContentTypeAnalyzer.IsPst(fileMeta.short_name):
|
|
pstProcessor.Process(fileData, fileMeta, fileMeta.source_id)
|
|
|
|
# extracting
|
|
logger.LogMessage('verbose', 'parsing {0}'.format(fileMeta.full_name))
|
|
fileParserResp = fileParser.Parse(fileMeta.short_name, fileData)
|
|
|
|
if not fileParserResp.success:
|
|
logger.LogMessage('error', 'error parsing {0} {1}'.format(
|
|
fileMeta.full_name, fileParserResp.message))
|
|
return False
|
|
|
|
logger.LogMessage(
|
|
'verbose', 'successfully parsed {0}'.format(fileMeta.full_name))
|
|
|
|
# building Ambar File Content
|
|
fileContent = AmbarFileContent.Init(fileParserResp, sys.getsizeof(fileData))
|
|
|
|
# submitting thumbnail
|
|
if fileParserResp.thumbnail:
|
|
logger.LogMessage(
|
|
'verbose', 'submitting thumbnail {0}'.format(fileMeta.full_name))
|
|
apiResp = apiProxy.SubmitThumbnail(
|
|
sha, fileParserResp.thumbnail[0])
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error submitting thumbnail to Api {0} {1}'.format(
|
|
fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not apiResp.Ok:
|
|
logger.LogMessage('error', 'error submitting thumbnail to Api, unexpected response code {0} {1} {2}'.format(
|
|
fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
fileContent.thumb_available = True
|
|
logger.LogMessage('verbose', 'thumbnail submited {0}'.format(fileMeta.full_name))
|
|
|
|
# submitting parsed text to Api
|
|
if not ContentTypeAnalyzer.IsArchive(fileMeta.short_name) and not ContentTypeAnalyzer.IsPst(fileMeta.short_name):
|
|
logger.LogMessage('verbose', 'submitting parsed text {0}'.format(fileMeta.full_name))
|
|
|
|
apiResp = apiProxy.SubmitExtractedContent(
|
|
sha, fileContent.text.encode(encoding='utf_8', errors='ignore'))
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error submitting parsed text to Api {0} {1}'.format(fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not apiResp.Ok:
|
|
logger.LogMessage('error', 'error submitting parsed text to Api, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
logger.LogMessage('verbose', 'parsed text submited {0}'.format(fileMeta.full_name))
|
|
|
|
# submitting processed file to Api
|
|
logger.LogMessage('verbose', 'submitting parsed content {0}'.format(fileMeta.full_name))
|
|
|
|
ambarFile = {}
|
|
ambarFile['content'] = fileContent.Dict if isinstance(fileContent, AmbarFileContent) else fileContent
|
|
ambarFile['meta'] = fileMeta.Dict
|
|
ambarFile['sha256'] = sha
|
|
ambarFile['file_id'] = fileId
|
|
ambarFile['indexed_datetime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
|
|
apiResp = apiProxy.SubmitProcessedFile(fileId, json.dumps(dict(ambarFile)).encode(encoding='utf_8', errors='ignore'))
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error submitting parsed content to Api {0} {1}'.format(
|
|
fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not (apiResp.Ok or apiResp.Created):
|
|
logger.LogMessage('error', 'error submitting parsed content to Api, unexpected response code {0} {1} {2}'.format(
|
|
fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
logger.LogMessage('verbose', 'parsed content submited {0}'.format(fileMeta.full_name))
|
|
|
|
apiResp = apiProxy.AddMetaIdToCache(fileMeta.id)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error adding meta id to cache {0} {1}'.format(fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not apiResp.Ok:
|
|
logger.LogMessage('error', 'error adding meta id to cache, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
# removing original file
|
|
if not preserveOriginals:
|
|
apiResp = apiProxy.RemoveFileContent(sha)
|
|
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error removing original file from Ambar for {0} {1}'.format(fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not (apiResp.Ok or apiResp.NotFound):
|
|
logger.LogMessage('error', 'error removing original file from Ambar for {0}, unexpected response code {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
if apiResp.Ok:
|
|
logger.LogMessage(
|
|
'verbose', 'original file removed from Ambar for {0}'.format(fileMeta.full_name))
|
|
|
|
## tags
|
|
apiResp = apiProxy.RemoveAutoTags(fileId)
|
|
if not apiResp.Success:
|
|
logger.LogMessage('error', 'error removing autotags {0} {1}'.format(fileMeta.full_name, apiResp.message))
|
|
return False
|
|
|
|
if not apiResp.Ok:
|
|
logger.LogMessage('error', 'error removing autotags, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
|
|
return False
|
|
|
|
autoTagger.AutoTagAmbarFile(ambarFile)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.LogMessage('error', 'error processing task {0}'.format(repr(e)))
|
|
return False
|
|
|
|
# main callback on receiving message from Rabbit
|
|
|
|
def RabbitConsumeCallback(channel, method, properties, body):
|
|
message = json.loads(body.decode('utf-8'))
|
|
if (ProcessFile(message)):
|
|
channel.basic_ack(delivery_tag=method.delivery_tag)
|
|
else:
|
|
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
|
gc.collect()
|
|
|
|
|
|
rabbitChannel.basic_consume(RabbitConsumeCallback, queue=RABBIT_QUEUE_NAME)
|
|
rabbitChannel.start_consuming()
|
|
|
|
exit(0)
|