You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ambar2/Pipeline/pipeline.py

337 lines
13 KiB
Python

# import jnius_config
# jnius_config.add_options('-Xmx256m')
from apiproxy import ApiProxy
from logger import AmbarLogger
from parsers.fileparser import FileParser
from parsers.contenttypeanalyzer import ContentTypeAnalyzer
from contentprocessors.autotagger import AutoTagger
from model import AmbarFileContent, AmbarFileMeta
from containerprocessors.archiveprocessor import ArchiveProcessor
from containerprocessors.pstprocessor import PstProcessor
from datetime import datetime
import gc
import io
import sys
import os
import time
import hashlib
import pika
import json
import base64
from hashlib import sha256
RABBIT_QUEUE_NAME = 'AMBAR_PIPELINE_QUEUE'
RABBIT_HEARTBEAT = 0
API_CALL_TIMEOUT_SECONDS = 1200
PARSE_TIMEOUT_SECONDS = 1200
pipelineId = os.getenv('id', '0')
apiUrl = os.getenv('api_url', 'http://serviceapi:8081')
webApiUrl = os.getenv('web_api_url', 'http://webapi:8080')
rabbitHost = os.getenv('rabbit_host','amqp://ambar')
ocrPdfSymbolsPerPageThreshold = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 1000))
ocrPdfMaxPageCount = int(os.getenv('ocrPdfMaxPageCount', 5))
preserveOriginals = True if os.getenv('preserveOriginals', 'False') == 'True' else False
# instantiating Api proxy
apiProxy = ApiProxy(apiUrl, webApiUrl, API_CALL_TIMEOUT_SECONDS)
# instantiating logger
logger = AmbarLogger(apiProxy, pipelineId)
# instantiating ArchiveProcessor
archiveProcessor = ArchiveProcessor(logger, apiProxy)
# instantiating PstProcessor
pstProcessor = PstProcessor(logger, apiProxy)
# instantiating Parser
fileParser = FileParser(logger, PARSE_TIMEOUT_SECONDS, ocrPdfSymbolsPerPageThreshold, ocrPdfMaxPageCount)
# instantiating AutoTagger
autoTagger = AutoTagger(logger, apiProxy)
# checking whether to preserve originals or not
preserveOriginals = True if preserveOriginals else False
# reporting start
logger.LogMessage('info', 'started')
# connecting to Rabbit
logger.LogMessage('info', 'connecting to Rabbit {0}...'.format(rabbitHost))
try:
rabbitConnection = pika.BlockingConnection(pika.URLParameters(
'{0}?heartbeat={1}'.format(rabbitHost, RABBIT_HEARTBEAT)))
rabbitChannel = rabbitConnection.channel()
rabbitChannel.basic_qos(prefetch_count=1, all_channels=True)
logger.LogMessage('info', 'connected to Rabbit!')
except Exception as e:
logger.LogMessage('error', 'error initializing connection to Rabbit {0}'.format(repr(e)))
exit(1)
# starting pipeline
logger.LogMessage('info', 'waiting for messages...')
def ProcessFile(message):
try:
meta = message['meta']
event = message['event']
sha = None
logger.LogMessage('verbose', '{0} task received for {1}'.format(event, meta['full_name']))
if ('sha' in message):
sha = message['sha']
fileId = sha256('{0}{1}'.format(meta['source_id'],meta['full_name']).encode('utf-8')).hexdigest()
if (event == 'unlink'):
apiResp = apiProxy.HideFile(fileId)
if not apiResp.Success:
logger.LogMessage('error', 'error hidding file for {0} {1}'.format(meta['full_name'], apiResp.message))
return False
if apiResp.Ok:
logger.LogMessage('verbose', 'removed {0}'.format(meta['full_name']))
return True
if not apiResp.NotFound:
logger.LogMessage('error', 'error hidding file {0} {1} code: {2}'.format(meta['full_name'], apiResp.message, apiResp.code))
return False
return True
if (event != 'add' and event != 'change'):
print('Ignoring {0}'.format(event))
return True
apiResp = apiProxy.CheckIfMetaExists(meta)
if not apiResp.Success:
logger.LogMessage('error', 'error checking meta existance for {0} {1}'.format(meta['full_name'], apiResp.message))
return False
if apiResp.Ok:
logger.LogMessage('verbose', 'meta found for {0}'.format(meta['full_name']))
return True
if not apiResp.NotFound:
logger.LogMessage('error', 'error checking meta existance for {0} {1} {2}'.format(meta['full_name'], apiResp.code, apiResp.message))
return False
apiResp = apiProxy.UnhideFile(fileId)
if not apiResp.Success:
logger.LogMessage('error', 'error unhiding file {0} {1}'.format(meta['full_name'], apiResp.message))
return False
if not (apiResp.Ok or apiResp.NotFound):
logger.LogMessage('error', 'error unhiding file, unexpected response code {0} {1} {2}'.format(meta['full_name'], apiResp.code, apiResp.message))
return False
fileMeta = AmbarFileMeta.Init(meta)
if not fileMeta.initialized:
logger.LogMessage('error', 'error initializing file meta {0}'.format(fileMeta.message))
return False
if (sha):
apiResp = apiProxy.DownloadFileBySha(sha)
else:
apiResp = apiProxy.DownloadFile(fileMeta.full_name)
if not apiResp.Success:
logger.LogMessage('error', 'error downloading file {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error downloading file {0} {1} code: {2}'.format(fileMeta.full_name, apiResp.message, apiResp.code))
return False
fileData = apiResp.payload
sha = sha256(fileData).hexdigest()
hasParsedContent = False
fileContent = {}
apiResp = apiProxy.GetParsedFileContentFields(sha)
if not apiResp.Success:
logger.LogMessage('error', 'error retrieving parsed file content fields {0} {1}'.format(
fileMeta.full_name, apiResp.message))
return False
if not (apiResp.Ok or apiResp.NotFound):
logger.LogMessage('error', 'error retrieving parsed file content fields {0} {1} {2}'.format(
fileMeta.full_name, apiResp.code, apiResp.message))
return False
if apiResp.Ok:
hasParsedContent = True
fileContent = apiResp.payload
if hasParsedContent:
apiResp = apiProxy.GetParsedFileContent(sha)
if not apiResp.Success:
logger.LogMessage('error', 'error retrieving parsed file content {0} {1}'.format(
fileMeta.full_name, apiResp.message))
return False
if not (apiResp.Ok or apiResp.NotFound):
logger.LogMessage('error', 'error retrieving parsed file content {0} {1} {2}'.format(
fileMeta.full_name, apiResp.code, apiResp.message))
return False
if apiResp.NotFound:
hasParsedContent = False
if apiResp.Ok:
hasParsedContent = True
fileContent['text'] = apiResp.payload.decode('utf-8', 'ignore')
logger.LogMessage(
'verbose', 'parsed content found {0}'.format(fileMeta.full_name))
if not hasParsedContent:
# checking if file is archive
if ContentTypeAnalyzer.IsArchive(fileMeta.short_name):
archiveProcessor.Process(fileData, fileMeta, fileMeta.source_id)
# checking if file is pst
if ContentTypeAnalyzer.IsPst(fileMeta.short_name):
pstProcessor.Process(fileData, fileMeta, fileMeta.source_id)
# extracting
logger.LogMessage('verbose', 'parsing {0}'.format(fileMeta.full_name))
fileParserResp = fileParser.Parse(fileMeta.short_name, fileData)
if not fileParserResp.success:
logger.LogMessage('error', 'error parsing {0} {1}'.format(
fileMeta.full_name, fileParserResp.message))
return False
logger.LogMessage(
'verbose', 'successfully parsed {0}'.format(fileMeta.full_name))
# building Ambar File Content
fileContent = AmbarFileContent.Init(fileParserResp, sys.getsizeof(fileData))
# submitting thumbnail
if fileParserResp.thumbnail:
logger.LogMessage(
'verbose', 'submitting thumbnail {0}'.format(fileMeta.full_name))
apiResp = apiProxy.SubmitThumbnail(
sha, fileParserResp.thumbnail[0])
if not apiResp.Success:
logger.LogMessage('error', 'error submitting thumbnail to Api {0} {1}'.format(
fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error submitting thumbnail to Api, unexpected response code {0} {1} {2}'.format(
fileMeta.full_name, apiResp.code, apiResp.message))
return False
fileContent.thumb_available = True
logger.LogMessage('verbose', 'thumbnail submited {0}'.format(fileMeta.full_name))
# submitting parsed text to Api
if not ContentTypeAnalyzer.IsArchive(fileMeta.short_name) and not ContentTypeAnalyzer.IsPst(fileMeta.short_name):
logger.LogMessage('verbose', 'submitting parsed text {0}'.format(fileMeta.full_name))
apiResp = apiProxy.SubmitExtractedContent(
sha, fileContent.text.encode(encoding='utf_8', errors='ignore'))
if not apiResp.Success:
logger.LogMessage('error', 'error submitting parsed text to Api {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error submitting parsed text to Api, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
logger.LogMessage('verbose', 'parsed text submited {0}'.format(fileMeta.full_name))
# submitting processed file to Api
logger.LogMessage('verbose', 'submitting parsed content {0}'.format(fileMeta.full_name))
ambarFile = {}
ambarFile['content'] = fileContent.Dict if isinstance(fileContent, AmbarFileContent) else fileContent
ambarFile['meta'] = fileMeta.Dict
ambarFile['sha256'] = sha
ambarFile['file_id'] = fileId
ambarFile['indexed_datetime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
apiResp = apiProxy.SubmitProcessedFile(fileId, json.dumps(dict(ambarFile)).encode(encoding='utf_8', errors='ignore'))
if not apiResp.Success:
logger.LogMessage('error', 'error submitting parsed content to Api {0} {1}'.format(
fileMeta.full_name, apiResp.message))
return False
if not (apiResp.Ok or apiResp.Created):
logger.LogMessage('error', 'error submitting parsed content to Api, unexpected response code {0} {1} {2}'.format(
fileMeta.full_name, apiResp.code, apiResp.message))
return False
logger.LogMessage('verbose', 'parsed content submited {0}'.format(fileMeta.full_name))
apiResp = apiProxy.AddMetaIdToCache(fileMeta.id)
if not apiResp.Success:
logger.LogMessage('error', 'error adding meta id to cache {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error adding meta id to cache, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
# removing original file
if not preserveOriginals:
apiResp = apiProxy.RemoveFileContent(sha)
if not apiResp.Success:
logger.LogMessage('error', 'error removing original file from Ambar for {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not (apiResp.Ok or apiResp.NotFound):
logger.LogMessage('error', 'error removing original file from Ambar for {0}, unexpected response code {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
if apiResp.Ok:
logger.LogMessage(
'verbose', 'original file removed from Ambar for {0}'.format(fileMeta.full_name))
## tags
apiResp = apiProxy.RemoveAutoTags(fileId)
if not apiResp.Success:
logger.LogMessage('error', 'error removing autotags {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error removing autotags, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
autoTagger.AutoTagAmbarFile(ambarFile)
return True
except Exception as e:
logger.LogMessage('error', 'error processing task {0}'.format(repr(e)))
return False
# main callback on receiving message from Rabbit
def RabbitConsumeCallback(channel, method, properties, body):
message = json.loads(body.decode('utf-8'))
if (ProcessFile(message)):
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
gc.collect()
rabbitChannel.basic_consume(RabbitConsumeCallback, queue=RABBIT_QUEUE_NAME)
rabbitChannel.start_consuming()
exit(0)