Release v2.1.8 Sponsored by IFIC

master v2.1.8
Ilya.P 8 years ago
parent 15b503f7b1
commit ea6dcff5f4

@ -1,5 +1,19 @@
Change log Change log
========== ==========
2.1.8 (2018-05-16)
-------------------
This Release is sponsored by [IFIC.co.uk](http://www.ific.co.uk/), special thanks to Dr. Barry Clark
### What's new:
- Dramatic increase of crawling speed
- Storage consumption is minimal, now you download files directly from crawled fs
- File removal sync -> if file removed from folder it will be marked as removed in Ambar
- Added ability to ignore files by folders, extensions and file names
- Bug fixes and small changes to docker-compose.yml
Update notes: before update, please download the latest [docker-compose.yml](https://github.com/RD17/ambar/blob/master/docker-compose.yml) and [read the installation instruction](https://github.com/RD17/ambar/blob/master/Install.md).
2.0.0rc (2018-04-18) 2.0.0rc (2018-04-18)
------------------- -------------------

@ -50,11 +50,6 @@ const HintCard = (props) => {
tags:ocr,ui-upload tags:ocr,ui-upload
</span> - {localization.searchPage.tagsQueryLabel} </span> - {localization.searchPage.tagsQueryLabel}
</li> </li>
<li>
<span className={classes.clickableSpan} onTouchTap={() => { performSearchByQuery('entities:"hello@ambar.cloud"') }}>
entities:"hello@ambar.cloud"
</span> - {localization.searchPage.entitiesQueryLabel}
</li>
<li> <li>
<span className={classes.clickableSpan} onTouchTap={() => { performSearchByQuery('show:removed') }}> <span className={classes.clickableSpan} onTouchTap={() => { performSearchByQuery('show:removed') }}>
show:removed show:removed

@ -32,7 +32,7 @@ class DetailedView extends Component {
key={hit.file_id} key={hit.file_id}
hit={hit} hit={hit}
thumbnailUri={urls.ambarWebApiGetThumbnail(hit.sha256)} thumbnailUri={urls.ambarWebApiGetThumbnail(hit.sha256)}
downloadUri={urls.ambarWebApiGetFile(hit.meta.download_uri)} downloadUri={urls.ambarWebApiGetFile(hit.meta.full_name)}
{...this.props} {...this.props}
/> />
)} )}

@ -101,14 +101,14 @@ class DetailedCard extends Component {
</div>} </div>}
<CardActions className={classes.searchResultRowCardFooter}> <CardActions className={classes.searchResultRowCardFooter}>
<div style={{ display: 'flex', justifyContent: !isHidden ? 'space-between' : 'flex-end', width: '100%' }}> <div style={{ display: 'flex', justifyContent: !isHidden ? 'space-between' : 'flex-end', width: '100%' }}>
{!isHidden && <div> {!isHidden && !hidden_mark && meta.source_id != 'ui-upload' && !meta.extra.some(item => item.key === 'from_container') && <div>
{preserveOriginals && <FlatButton <FlatButton
icon={<FileDownloadIcon />} icon={<FileDownloadIcon />}
label={localization.searchPage.downloadLabel} label={localization.searchPage.downloadLabel}
title={localization.searchPage.downloadDescriptionLabel} title={localization.searchPage.downloadDescriptionLabel}
primary={true} primary={true}
onTouchTap={() => { window.open(downloadUri) }} onTouchTap={() => { window.open(downloadUri) }}
/>} />
</div>} </div>}
<div> <div>
{!hidden_mark && <FlatButton {!hidden_mark && <FlatButton
@ -119,13 +119,6 @@ class DetailedCard extends Component {
style={{ color: 'grey' }} style={{ color: 'grey' }}
onTouchTap={() => hideFile(fileId)} onTouchTap={() => hideFile(fileId)}
/>} />}
{(isHidden || hidden_mark) && <FlatButton
icon={<UndoIcon />}
label={localization.searchPage.restoreLabel}
title={localization.searchPage.restoreDescriptionLabel}
primary={true}
onTouchTap={() => showFile(fileId)}
/>}
</div> </div>
</div>} </div>}
</CardActions> </CardActions>

@ -55,7 +55,7 @@ class TableView extends Component {
key={hit.file_id} key={hit.file_id}
hit={hit} hit={hit}
thumbnailUri={urls.ambarWebApiGetThumbnail(hit.sha256)} thumbnailUri={urls.ambarWebApiGetThumbnail(hit.sha256)}
downloadUri={urls.ambarWebApiGetFile(hit.meta.download_uri)} downloadUri={urls.ambarWebApiGetFile(hit.meta.full_name)}
{...this.props} {...this.props}
/> />
)} )}

@ -80,7 +80,7 @@ class TableRowResult extends Component {
<UpdatedDateTimeLabel meta={meta} searchQuery={searchQuery} formatFunc={getFormattedTime} /> <UpdatedDateTimeLabel meta={meta} searchQuery={searchQuery} formatFunc={getFormattedTime} />
</TableRowColumn> </TableRowColumn>
<TableRowColumn style={{ width: '220px' }}> <TableRowColumn style={{ width: '220px' }}>
{preserveOriginals && <IconButton onTouchTap={() => { window.open(downloadUri) }} {!hidden_mark && meta.source_id != 'ui-upload' && !meta.extra.some(item => item.key === 'from_container') && <IconButton onTouchTap={() => { window.open(downloadUri) }}
title={localization.searchPage.downloadDescriptionLabel}> title={localization.searchPage.downloadDescriptionLabel}>
<FileDownloadIcon color='#00bcd4' hoverColor='#80deea' /> <FileDownloadIcon color='#00bcd4' hoverColor='#80deea' />
</IconButton>} </IconButton>}
@ -95,9 +95,6 @@ class TableRowResult extends Component {
{!hidden_mark && <IconButton onTouchTap={() => hideFile(fileId)} title={localization.searchPage.removeLabel}> {!hidden_mark && <IconButton onTouchTap={() => hideFile(fileId)} title={localization.searchPage.removeLabel}>
<DeleteIcon color='#00bcd4' hoverColor='#80deea' /> <DeleteIcon color='#00bcd4' hoverColor='#80deea' />
</IconButton>} </IconButton>}
{(isHidden || hidden_mark) && <IconButton onTouchTap={() => showFile(fileId)} title={localization.searchPage.restoreLabel}>
<UndoIcon color='#00bcd4' hoverColor='#80deea' />
</IconButton>}
</TableRowColumn> </TableRowColumn>
</TableRow> </TableRow>
) )

@ -35,10 +35,10 @@ class RateUs extends Component {
</div> </div>
<div className={classes.rateUsText}> <div className={classes.rateUsText}>
<p> <p>
Let's spread the word that Ambar is awesome! Help us make Ambar even better, follow us on Twitter or give us your stars on Github. Let's spread the word that Ambar is awesome! Help us make Ambar even better, give us your stars on Github.
</p> </p>
<p> <p>
Together we will build the best document search system in the world! Together we will build the best document search engine in the world!
</p> </p>
<div style={{display: 'flex', justifyContent: 'center'}}> <div style={{display: 'flex', justifyContent: 'center'}}>
<FlatButton <FlatButton
@ -47,12 +47,6 @@ class RateUs extends Component {
onTouchTap={() => goToUrl('https://github.com/RD17/ambar')} onTouchTap={() => goToUrl('https://github.com/RD17/ambar')}
icon={<img height={20} src={GithubIcon} />} icon={<img height={20} src={GithubIcon} />}
/> />
<FlatButton
label="Tweet"
primary={true}
onTouchTap={() => goToUrl('https://twitter.com/intent/tweet?text=%23Ambar%20is%20awesome%20%23DocumentSearchSystem!%20Check%20it%20out%20on%20https%3A%2F%2Fambar.cloud')}
icon={<img height={20} src={TwitterIcon} />}
/>
</div> </div>
</div> </div>
</div> </div>

@ -5,7 +5,7 @@ const init = (apiHost) => {
ambarWebApiSearchByStringQuery: (query, page, size) => `${apiHost}/api/search?query=${encodeURIComponent(query)}&page=${page}&size=${size}`, ambarWebApiSearchByStringQuery: (query, page, size) => `${apiHost}/api/search?query=${encodeURIComponent(query)}&page=${page}&size=${size}`,
ambarWebApiLoadContentHightlight: (fileId, query) => `${apiHost}/api/search/${fileId}/?query=${encodeURIComponent(query)}`, ambarWebApiLoadContentHightlight: (fileId, query) => `${apiHost}/api/search/${fileId}/?query=${encodeURIComponent(query)}`,
ambarWebApiLoadFullContentHightlight: (fileId, query) => `${apiHost}/api/search/${fileId}/full?query=${encodeURIComponent(query)}`, ambarWebApiLoadFullContentHightlight: (fileId, query) => `${apiHost}/api/search/${fileId}/full?query=${encodeURIComponent(query)}`,
ambarWebApiGetFile: (metaId) => `${apiHost}/api/files/${metaId}`, ambarWebApiGetFile: (fullPath) => `${apiHost}/api/files/download?path=${encodeURIComponent(fullPath)}`,
ambarWebApiGetFileText: (metaId) => `${apiHost}/api/files/${metaId}/text`, ambarWebApiGetFileText: (metaId) => `${apiHost}/api/files/${metaId}/text`,
ambarWebApiGetCrawlers: () => `${apiHost}/api/crawlers`, ambarWebApiGetCrawlers: () => `${apiHost}/api/crawlers`,

@ -1,7 +1,7 @@
# Install docker and docker-compose # Install docker and docker-compose
To install and configure Ambar you need an expertise in unix, Docker and Docker Compose. To install and configure Ambar you need an expertise in unix, Docker and Docker Compose.
If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on hello@ambar.cloud If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on [hello@ambar.cloud](mailto:hello@ambar.cloud)
Please refer to official [Docker](https://docs.docker.com/install/) and [Docker Compose](https://docs.docker.com/compose/install/) installation instructions. Please refer to official [Docker](https://docs.docker.com/install/) and [Docker Compose](https://docs.docker.com/compose/install/) installation instructions.
@ -44,11 +44,40 @@ Then modify it:
## Set up your crawlers ## Set up your crawlers
- Find ```crawler0``` block - this is a template for your new crawler - Find ````${crawlerName}``` block - this is a template for your new crawler
- Replace ```${crawlerName}``` with desired name for your crawler (only lowercase latin letters and dashes are supported) - Replace ```${crawlerName}``` with desired name for your crawler (only lowercase latin letters and dashes are supported). Check that service block name and crawler name are the same
- Replace ```${pathToCrawl}``` with path to a local folder to be crawled, if you want to crawl SMB or FTP - just mount it with standard unix tools - Replace ```${pathToCrawl}``` with path to a local folder to be crawled, if you want to crawl SMB or FTP - just mount it with standard unix tools
You can add additional crawlers by copying ```crawler0``` segment and editing its settings (don't forget to edit the service name, e.g. to ```crawler1```). ### Optional settings
- `ignoreFolders` - ignore fodlers by [glob pattern](https://github.com/isaacs/node-glob#glob-primer)
- `ignoreExtensions` - ignore file extensions by [glob pattern](https://github.com/isaacs/node-glob#glob-primer) (default: .{exe,dll})
- `ignoreFileNames` - ignore file names by [glob pattern](https://github.com/isaacs/node-glob#glob-primer) (default: ~*)
- `maxFileSize` - max file size (default: 300mb)
### Crawler configuration example:
```
Docs:
depends_on:
serviceapi:
condition: service_healthy
image: ambar/ambar-local-crawler
restart: always
networks:
- internal_network
expose:
- "8082"
environment:
- name=Docs
- ignoreFolders=**/ForSharing/**
- ignoreExtensions=.{exe,dll,rar}
- ignoreFileNames=*backup*
- maxFileSize=15mb
volumes:
- /media/Docs:/usr/data
```
You can add more crawlers by copying ```${crawlerName}``` segment and editing its settings (don't forget to edit the service name).
# Start Ambar # Start Ambar
@ -58,4 +87,4 @@ To start Ambar run ```docker-compose up -d```.
Ambar UI will be accessible on ```http://${ambarHostIpAddress}/``` Ambar UI will be accessible on ```http://${ambarHostIpAddress}/```
If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on hello@ambar.cloud If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on [hello@ambar.cloud](mailto:hello@ambar.cloud)

@ -8,7 +8,7 @@ COPY . .
RUN yarn install RUN yarn install
RUN yarn run build RUN yarn run build
CMD node dist CMD node --max-old-space-size=8096 dist
HEALTHCHECK --interval=5s --timeout=30s --retries=50 \ HEALTHCHECK --interval=5s --timeout=30s --retries=50 \
CMD curl -f localhost:8082/api/ || exit 1 CMD curl -f localhost:8082/api/ || exit 1

@ -21,6 +21,7 @@
"author": "RD17", "author": "RD17",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"amqplib": "^0.5.2",
"babel-eslint": "^7.1.0", "babel-eslint": "^7.1.0",
"babel-polyfill": "^6.26.0", "babel-polyfill": "^6.26.0",
"body-parser": "^1.13.3", "body-parser": "^1.13.3",
@ -32,6 +33,7 @@
"eslint-plugin-promise": "^3.3.0", "eslint-plugin-promise": "^3.3.0",
"express": "^4.13.3", "express": "^4.13.3",
"idempotent-babel-polyfill": "^0.1.1", "idempotent-babel-polyfill": "^0.1.1",
"minimatch": "^3.0.4",
"moment": "^2.15.0", "moment": "^2.15.0",
"request": "^2.85.0", "request": "^2.85.0",
"request-promise-native": "^1.0.5", "request-promise-native": "^1.0.5",

@ -1,6 +1,10 @@
import { version, name, description } from '../../package.json' import { version, name, description } from '../../package.json'
import { Router } from 'express' import { Router } from 'express'
// import config from '../config' import fs from 'fs'
import path from 'path'
import config from '../config'
import * as ApiProxy from '../services/ApiProxy'
export default () => { export default () => {
let api = Router() let api = Router()
@ -13,5 +17,40 @@ export default () => {
}) })
}) })
api.get('/download', (req, res) => {
const filePath = req.query.path
if (!filePath) {
res.sendStatus(400)
return
}
let absolutePath = null
let doesFileExist = false
try {
absolutePath = path.join(config.crawlPath, filePath)
doesFileExist = fs.existsSync(absolutePath)
} catch (error) {
ApiProxy.logData(config.name, 'error', `Error: ${error}`)
res.status(500).json({ error: error })
return
}
if (!doesFileExist) {
res.sendStatus(404)
return
}
res.download(absolutePath, (error) => {
if (error) {
if (!res.headersSent) {
res.status(500).json({ error: error })
}
ApiProxy.logData(config.name, 'error', `[${absolutePath}] Error: ${error}`)
}
})
})
return api return api
} }

@ -1,11 +1,14 @@
const defaultConfig = { const defaultConfig = {
"port": 8082, "port": 8082,
"bodyLimit": "10mb", "bodyLimit": "10mb",
"crawlPath": "C:\\Dropbox\\Development\\Git\\Ambar\\LocalCrawler\\node_modules", "crawlPath": "/usr/data",
"apiUrl": "http://ambar:8081", "apiUrl": "http://serviceapi:8080",
"allowedFilesRegex": '(\\.doc[a-z]*$)|(\\.xls[a-z]*$)|(\\.txt$)|(\\.pst$)|(\\.csv$)|(\\.htm[a-z]*$)|(\\.ppt[a-z]*$)|(\\.pdf$)|(\\.msg$)|(\\.zip$)|(\\.eml$)|(\\.rtf$)|(\\.md$)|(\\.png$)|(\\.bmp$)|(\\.tif[f]*$)|(\\.jp[e]*g$)', "ignoreFolders": "**/test/**",
"name": "nodemodules-crawler", "ignoreExtensions": ".{exe,dll}",
"maxFileSize": "30mb" "ignoreFileNames": "~*",
"name": "localhost",
"maxFileSize": "300mb",
"rabbitHost": "amqp://rabbit"
} }
let config = null let config = null

@ -4,16 +4,20 @@ import cors from 'cors'
import bodyParser from 'body-parser' import bodyParser from 'body-parser'
import api from './api' import api from './api'
import config from './config' import config from './config'
import cluster from 'cluster'
import 'babel-core/register' import 'babel-core/register'
import 'idempotent-babel-polyfill' import 'idempotent-babel-polyfill'
import { FileWatchService, ApiProxy } from './services' import { FileWatchService, ApiProxy } from './services'
ApiProxy.logData(config.name, 'info', 'Crawler initialized') let app = null
FileWatchService.startWatch() if (cluster.isMaster) {
ApiProxy.logData(config.name, 'info', 'API runs on master thread')
ApiProxy.logData(config.name, 'info', 'Creating fork for the file-watcher process')
cluster.fork()
let app = express() app = express()
app.server = http.createServer(app) app.server = http.createServer(app)
app.use(cors({ app.use(cors({
@ -29,6 +33,18 @@ app.use(bodyParser.json({
app.use('/api', api()) app.use('/api', api())
app.server.listen(process.env.PORT || config.port) app.server.listen(process.env.PORT || config.port)
console.log(`Started on ${app.server.address().address}:${app.server.address().port}`) console.log(`Started API on ${app.server.address().address}:${app.server.address().port}`)
} else {
ApiProxy.logData(config.name, 'info', 'File-watcher runs on worker thread')
FileWatchService.startWatch()
.catch(err => {
ApiProxy.logData(config.name, 'error', `Error: ${err}`)
process.exit(1)
})
}
export default app export default app

@ -104,10 +104,10 @@ export const logData = (sourceId, type, message) => new Promise((resolve, reject
} }
} }
return request(options) request(options)
.then(() => { resolve()}) .then(() => resolve())
.catch(err => { .catch(err => {
console.log(err) console.error(err)
reject(err) reject(err)
}) })
}) })

@ -1,15 +0,0 @@
import crypto from 'crypto'
import fs from 'fs'
export const getFileMeta = (path) => fs.stat(path)
export const getFileSha = (path) => new Promise((resolve) => {
const shaSum = crypto.createHash('sha256')
const readStream = fs.ReadStream(path)
readStream.on('data', (data) => shaSum.update(data))
readStream.on('end', () => {
const result = shaSum.digest('hex')
resolve(result)
})
})

@ -3,50 +3,28 @@ import chokidar from 'chokidar'
import path from 'path' import path from 'path'
import config from '../config' import config from '../config'
import bytes from 'bytes' import bytes from 'bytes'
import minimatch from 'minimatch'
import * as ApiProxy from './ApiProxy' import * as ApiProxy from './ApiProxy'
import * as FileService from './FileService' import * as QueueProxy from './QueueProxy'
const WAIT_MS = 500 export const startWatch = () => new Promise((resolve, reject) => {
QueueProxy.initRabbit()
let tasks = [] .then(() => {
chokidar.watch(config.crawlPath, { usePolling: true, awaitWriteFinish: true })
const allowedFilesRegex = new RegExp(config.allowedFilesRegex, 'gi') .on('error', error => {
ApiProxy.logData(config.name, 'error', `Chokidar error: ${error}`)
export const startWatch = () => { })
processTasks()
chokidar.watch(config.crawlPath, { usePolling: true })
.on('all', (event, pathToFile, stat) => { .on('all', (event, pathToFile, stat) => {
if (event === 'add' || event === 'change') { if (event === 'add' || event === 'change' || event === 'unlink') {
tasks.push({ event, pathToFile, stat }) addTask(event, pathToFile, stat)
} }
}) })
} })
.catch(err => {
const processTasks = async () => { ApiProxy.logData(config.name, 'error', `Error: ${err}`)
//eslint-disable-next-line no-constant-condition reject(err)
while (true) { })
if (tasks.length === 0) {
await sleep()
continue
}
const { pathToFile, stat } = tasks[0]
tasks = tasks.slice(1)
try {
await tryAddFile(pathToFile, stat)
} catch (err) {
await ApiProxy.logData(config.name, 'error', `failed to submit ${pathToFile}`)
}
}
}
const sleep = () => new Promise((resolve) => {
setTimeout(() => {
resolve()
}, WAIT_MS)
}) })
const shouldIgnore = (pathToFile, stat) => { const shouldIgnore = (pathToFile, stat) => {
@ -57,60 +35,59 @@ const shouldIgnore = (pathToFile, stat) => {
const maxFileSizeBytes = bytes.parse(config.maxFileSize) const maxFileSizeBytes = bytes.parse(config.maxFileSize)
if (stat.size === 0) { if (stat.size === 0) {
ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: file.size != 0`)
return true return true
} }
if (stat.size > maxFileSizeBytes) { if (stat.size > maxFileSizeBytes) {
ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: file.size [${bytes(stat.size)}] > maxFileSize [${bytes(maxFileSizeBytes)}]`)
return true
}
const extName = path.extname(pathToFile)
if (!extName) {
ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: File should have extension`)
return true
}
if (config.ignoreExtensions && minimatch(extName, config.ignoreExtensions)) {
ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: ignore extensions [${config.ignoreExtensions}]`)
return true return true
} }
if (!allowedFilesRegex.test(pathToFile)) { const fileName = path.basename(pathToFile, extName)
if (config.ignoreFileNames && minimatch(fileName, config.ignoreFileNames)) {
ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: ignore fileNames [${config.ignoreFileNames}]`)
return true return true
} }
if (!path.extname(pathToFile)) { const dirName = path.dirname(pathToFile)
if (config.ignoreFolders && minimatch(dirName, config.ignoreFolders)) {
ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: ignore folders [${config.ignoreFolders}]`)
return true return true
} }
return false return false
} }
const tryAddFile = async (pathToFile, stat) => { const addTask = (event, pathToFile, stat) => {
let normalizedPath = path.normalize(pathToFile) let normalizedPath = path.normalize(pathToFile)
normalizedPath = `//${normalizedPath.replace(config.crawlPath, config.name)}`.replace(/\\/g, '/') normalizedPath = `//${normalizedPath.replace(config.crawlPath, config.name)}`.replace(/\\/g, '/')
if (shouldIgnore(normalizedPath, stat)) { if (shouldIgnore(normalizedPath, stat)) {
await ApiProxy.logData(config.name, 'info', `${normalizedPath} ignoring`)
return return
} }
const meta = { const meta = {
full_name: normalizedPath, full_name: normalizedPath,
updated_datetime: moment(stat.mtime).format('YYYY-MM-DD HH:mm:ss.SSS'), updated_datetime: !stat ? '' : moment(stat.mtime).format('YYYY-MM-DD HH:mm:ss.SSS'),
created_datetime: moment(stat.atime).format('YYYY-MM-DD HH:mm:ss.SSS'), created_datetime: !stat ? '' :moment(stat.atime).format('YYYY-MM-DD HH:mm:ss.SSS'),
source_id: config.name, source_id: config.name,
short_name: path.basename(normalizedPath), short_name: path.basename(normalizedPath),
extension: path.extname(normalizedPath), extension: path.extname(normalizedPath),
extra: [] extra: []
} }
const metaExists = await ApiProxy.doesFileMetaExist(meta) QueueProxy.enqueueMessage({ event: event, meta: meta })
if (metaExists) {
await ApiProxy.logData(config.name, 'info', `${normalizedPath} meta exists`)
return
}
const sha = await FileService.getFileSha(pathToFile)
const contentExist = await ApiProxy.doesParsedContentExist(sha)
if (contentExist) {
await ApiProxy.logData(config.name, 'info', `${normalizedPath} - content found`)
} else {
await ApiProxy.addFileContent(pathToFile, sha)
await ApiProxy.logData(config.name, 'info', `${normalizedPath} - content added`)
}
await ApiProxy.addFileMeta(meta, sha, config.name)
await ApiProxy.logData(config.name, 'info', `${normalizedPath} - meta updated`)
} }

@ -0,0 +1,38 @@
import amqp from 'amqplib'
import config from '../config'
const AMBAR_PIPELINE_EXCHANGE = "AMBAR_PIPELINE_EXCHANGE"
let channel = null
const getPipelineMessagePriority = (fileName) => {
const regex = /(\.jp[e]*g$)|(\.png$)|(\.bmp$)|(\.tif[f]*$)|(\.pdf$)/i
const priority = regex.test(fileName) ? 1 : 2
return priority
}
export const enqueueMessage = (message) => {
const fileName = message.fileName || message.meta.short_name
const priority = getPipelineMessagePriority(fileName)
channel.publish(AMBAR_PIPELINE_EXCHANGE, '', Buffer.from(JSON.stringify(message)), { priority: priority })
}
export const initRabbit = () => new Promise((resolve, reject) => {
amqp.connect(`${config.rabbitHost}?heartbeat=0`)
.then((conn) => {
conn.on('error', (err) => {
//eslint-disable-next-line no-console
console.error('Rabbit error!')
throw err
})
return conn.createChannel()
.then(ch => {
channel = ch
resolve()
})
})
.catch(err => reject(err))
})

@ -54,6 +54,16 @@ ajv@^5.1.0:
fast-json-stable-stringify "^2.0.0" fast-json-stable-stringify "^2.0.0"
json-schema-traverse "^0.3.0" json-schema-traverse "^0.3.0"
amqplib@^0.5.2:
version "0.5.2"
resolved "http://192.168.1.113:4873/amqplib/-/amqplib-0.5.2.tgz#d2d7313c7ffaa4d10bcf1e6252de4591b6cc7b63"
dependencies:
bitsyntax "~0.0.4"
bluebird "^3.4.6"
buffer-more-ints "0.0.2"
readable-stream "1.x >=1.1.9"
safe-buffer "^5.0.1"
ansi-escapes@^1.1.0: ansi-escapes@^1.1.0:
version "1.4.0" version "1.4.0"
resolved "https://registry.yarnpkg.com/ansi-escapes/-/ansi-escapes-1.4.0.tgz#d3a8a83b319aa67793662b13e761c7911422306e" resolved "https://registry.yarnpkg.com/ansi-escapes/-/ansi-escapes-1.4.0.tgz#d3a8a83b319aa67793662b13e761c7911422306e"
@ -854,6 +864,10 @@ balanced-match@^0.4.1:
version "0.4.2" version "0.4.2"
resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-0.4.2.tgz#cb3f3e3c732dc0f01ee70b403f302e61d7709838" resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-0.4.2.tgz#cb3f3e3c732dc0f01ee70b403f302e61d7709838"
balanced-match@^1.0.0:
version "1.0.0"
resolved "http://192.168.1.113:4873/balanced-match/-/balanced-match-1.0.0.tgz#89b4d199ab2bee49de164ea02b89ce462d71b767"
base@^0.11.1: base@^0.11.1:
version "0.11.2" version "0.11.2"
resolved "http://192.168.1.113:4873/base/-/base-0.11.2.tgz#7bde5ced145b6d551a90db87f83c558b4eb48a8f" resolved "http://192.168.1.113:4873/base/-/base-0.11.2.tgz#7bde5ced145b6d551a90db87f83c558b4eb48a8f"
@ -876,12 +890,22 @@ binary-extensions@^1.0.0:
version "1.8.0" version "1.8.0"
resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-1.8.0.tgz#48ec8d16df4377eae5fa5884682480af4d95c774" resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-1.8.0.tgz#48ec8d16df4377eae5fa5884682480af4d95c774"
bitsyntax@~0.0.4:
version "0.0.4"
resolved "http://192.168.1.113:4873/bitsyntax/-/bitsyntax-0.0.4.tgz#eb10cc6f82b8c490e3e85698f07e83d46e0cba82"
dependencies:
buffer-more-ints "0.0.2"
block-stream@*: block-stream@*:
version "0.0.9" version "0.0.9"
resolved "https://registry.yarnpkg.com/block-stream/-/block-stream-0.0.9.tgz#13ebfe778a03205cfe03751481ebb4b3300c126a" resolved "https://registry.yarnpkg.com/block-stream/-/block-stream-0.0.9.tgz#13ebfe778a03205cfe03751481ebb4b3300c126a"
dependencies: dependencies:
inherits "~2.0.0" inherits "~2.0.0"
bluebird@^3.4.6:
version "3.5.1"
resolved "http://192.168.1.113:4873/bluebird/-/bluebird-3.5.1.tgz#d9551f9de98f1fcda1e683d17ee91a0602ee2eb9"
body-parser@^1.13.3: body-parser@^1.13.3:
version "1.16.1" version "1.16.1"
resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.16.1.tgz#51540d045adfa7a0c6995a014bb6b1ed9b802329" resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.16.1.tgz#51540d045adfa7a0c6995a014bb6b1ed9b802329"
@ -922,6 +946,13 @@ brace-expansion@^1.0.0:
balanced-match "^0.4.1" balanced-match "^0.4.1"
concat-map "0.0.1" concat-map "0.0.1"
brace-expansion@^1.1.7:
version "1.1.11"
resolved "http://192.168.1.113:4873/brace-expansion/-/brace-expansion-1.1.11.tgz#3c7fcbf529d87226f3d2f52b966ff5271eb441dd"
dependencies:
balanced-match "^1.0.0"
concat-map "0.0.1"
braces@^1.8.2: braces@^1.8.2:
version "1.8.5" version "1.8.5"
resolved "https://registry.yarnpkg.com/braces/-/braces-1.8.5.tgz#ba77962e12dff969d6b76711e914b737857bf6a7" resolved "https://registry.yarnpkg.com/braces/-/braces-1.8.5.tgz#ba77962e12dff969d6b76711e914b737857bf6a7"
@ -945,6 +976,10 @@ braces@^2.3.0, braces@^2.3.1:
split-string "^3.0.2" split-string "^3.0.2"
to-regex "^3.0.1" to-regex "^3.0.1"
buffer-more-ints@0.0.2:
version "0.0.2"
resolved "http://192.168.1.113:4873/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz#26b3885d10fa13db7fc01aae3aab870199e0124c"
buffer-shims@^1.0.0: buffer-shims@^1.0.0:
version "1.0.0" version "1.0.0"
resolved "https://registry.yarnpkg.com/buffer-shims/-/buffer-shims-1.0.0.tgz#9978ce317388c649ad8793028c3477ef044a8b51" resolved "https://registry.yarnpkg.com/buffer-shims/-/buffer-shims-1.0.0.tgz#9978ce317388c649ad8793028c3477ef044a8b51"
@ -2276,6 +2311,10 @@ is-windows@^1.0.2:
version "1.0.2" version "1.0.2"
resolved "http://192.168.1.113:4873/is-windows/-/is-windows-1.0.2.tgz#d1850eb9791ecd18e6182ce12a30f396634bb19d" resolved "http://192.168.1.113:4873/is-windows/-/is-windows-1.0.2.tgz#d1850eb9791ecd18e6182ce12a30f396634bb19d"
isarray@0.0.1:
version "0.0.1"
resolved "http://192.168.1.113:4873/isarray/-/isarray-0.0.1.tgz#8a18acfca9a8f4177e09abfc6038939b05d1eedf"
isarray@1.0.0, isarray@^1.0.0, isarray@~1.0.0: isarray@1.0.0, isarray@^1.0.0, isarray@~1.0.0:
version "1.0.0" version "1.0.0"
resolved "https://registry.yarnpkg.com/isarray/-/isarray-1.0.0.tgz#bb935d48582cba168c06834957a54a3e07124f11" resolved "https://registry.yarnpkg.com/isarray/-/isarray-1.0.0.tgz#bb935d48582cba168c06834957a54a3e07124f11"
@ -2580,6 +2619,12 @@ minimatch@^3.0.0, minimatch@^3.0.2:
dependencies: dependencies:
brace-expansion "^1.0.0" brace-expansion "^1.0.0"
minimatch@^3.0.4:
version "3.0.4"
resolved "http://192.168.1.113:4873/minimatch/-/minimatch-3.0.4.tgz#5166e286457f03306064be5497e8dbb0c3d32083"
dependencies:
brace-expansion "^1.1.7"
minimist@0.0.8: minimist@0.0.8:
version "0.0.8" version "0.0.8"
resolved "https://registry.yarnpkg.com/minimist/-/minimist-0.0.8.tgz#857fcabfc3397d2625b8228262e86aa7a011b05d" resolved "https://registry.yarnpkg.com/minimist/-/minimist-0.0.8.tgz#857fcabfc3397d2625b8228262e86aa7a011b05d"
@ -3039,6 +3084,15 @@ read-all-stream@^3.0.0:
pinkie-promise "^2.0.0" pinkie-promise "^2.0.0"
readable-stream "^2.0.0" readable-stream "^2.0.0"
"readable-stream@1.x >=1.1.9":
version "1.1.14"
resolved "http://192.168.1.113:4873/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9"
dependencies:
core-util-is "~1.0.0"
inherits "~2.0.1"
isarray "0.0.1"
string_decoder "~0.10.x"
readable-stream@^2.0.0, "readable-stream@^2.0.0 || ^1.1.13", readable-stream@^2.0.2, readable-stream@^2.2.2: readable-stream@^2.0.0, "readable-stream@^2.0.0 || ^1.1.13", readable-stream@^2.0.2, readable-stream@^2.2.2:
version "2.2.2" version "2.2.2"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.2.2.tgz#a9e6fec3c7dda85f8bb1b3ba7028604556fc825e" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.2.2.tgz#a9e6fec3c7dda85f8bb1b3ba7028604556fc825e"

@ -0,0 +1,117 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"pythonPath": "${config:python.pythonPath}",
"type": "python",
"request": "launch",
"program": "${file}"
},
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"localRoot": "${workspaceFolder}",
"remoteRoot": "${workspaceFolder}",
"port": 3000,
"secret": "my_secret",
"host": "localhost"
},
{
"name": "Python: Terminal (integrated)",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Python: Terminal (external)",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "externalTerminal"
},
{
"name": "Python: Django",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/manage.py",
"args": [
"runserver",
"--noreload",
"--nothreading"
],
"debugOptions": [
"RedirectOutput",
"Django"
]
},
{
"name": "Python: Flask (0.11.x or later)",
"type": "python",
"request": "launch",
"module": "flask",
"env": {
"FLASK_APP": "${workspaceFolder}/app.py"
},
"args": [
"run",
"--no-debugger",
"--no-reload"
]
},
{
"name": "Python: Module",
"type": "python",
"request": "launch",
"module": "module.name"
},
{
"name": "Python: Pyramid",
"type": "python",
"request": "launch",
"args": [
"${workspaceFolder}/development.ini"
],
"debugOptions": [
"RedirectOutput",
"Pyramid"
]
},
{
"name": "Python: Watson",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/console.py",
"args": [
"dev",
"runserver",
"--noreload=True"
]
},
{
"name": "Python: All debug Options",
"type": "python",
"request": "launch",
"pythonPath": "${config:python.pythonPath}",
"program": "${file}",
"module": "module.name",
"env": {
"VAR1": "1",
"VAR2": "2"
},
"envFile": "${workspaceFolder}/.env",
"args": [
"arg1",
"arg2"
],
"debugOptions": [
"RedirectOutput"
]
}
]
}

@ -36,7 +36,7 @@ RUN mkdir /pst-temp
ENV JAVA_HOME /usr/lib/jvm/default-java ENV JAVA_HOME /usr/lib/jvm/default-java
CMD python ./pipeline.py -id $id -api_url $api_url -rabbit_host $rabbit_host CMD python ./pipeline.py
HEALTHCHECK --interval=5s --timeout=30s --retries=50 \ HEALTHCHECK --interval=5s --timeout=30s --retries=50 \
CMD if (pidof -x python > /dev/null) then (exit 0) else (exit 1) fi CMD if (pidof -x python > /dev/null) then (exit 0) else (exit 1) fi

@ -1,10 +1,12 @@
import io import io
import re import re
import requests import requests
import urllib.parse
class ApiProxy: class ApiProxy:
def __init__(self, ApiUrl, ApiCallTimeoutSeconds): def __init__(self, ApiUrl, WebApiUrl, ApiCallTimeoutSeconds):
self.apiUrl = ApiUrl self.apiUrl = ApiUrl
self.webApiUrl = WebApiUrl
self.apiCallTimeoutSeconds = ApiCallTimeoutSeconds self.apiCallTimeoutSeconds = ApiCallTimeoutSeconds
def GetTaggingRules(self): def GetTaggingRules(self):
@ -58,6 +60,18 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def CheckIfMetaExists(self, Meta):
apiResp = RestApiResponse()
try:
apiUri = '{0}/api/files/meta/exists'.format(self.apiUrl)
req = requests.post(apiUri, json = Meta, timeout = self.apiCallTimeoutSeconds)
apiResp.result = 'ok'
apiResp.code = req.status_code
except requests.exceptions.RequestException as ex:
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
def CreateAmbarFileContent(self, FileData, Sha256): def CreateAmbarFileContent(self, FileData, Sha256):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
@ -91,10 +105,10 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def GetFileContent(self, sha): def GetFileContent(self, Sha):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
apiUri = '{0}/api/files/content/{1}'.format(self.apiUrl, sha) apiUri = '{0}/api/files/content/{1}'.format(self.apiUrl, Sha)
req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds) req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
if req.status_code == 200: if req.status_code == 200:
contentDispositionHeader = req.headers['content-disposition'] contentDispositionHeader = req.headers['content-disposition']
@ -114,6 +128,76 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def HideFile(self, FileId):
apiResp = RestApiResponse()
try:
apiUri = '{0}/api/files/hide/{1}'.format(self.webApiUrl, FileId)
req = requests.put(apiUri, timeout = self.apiCallTimeoutSeconds)
try:
apiResp.message = req.text
except:
pass
apiResp.result = 'ok'
apiResp.code = req.status_code
except requests.exceptions.RequestException as ex:
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
def UnhideFile(self, FileId):
apiResp = RestApiResponse()
try:
apiUri = '{0}/api/files/unhide/{1}'.format(self.webApiUrl, FileId)
req = requests.put(apiUri, timeout = self.apiCallTimeoutSeconds)
try:
apiResp.message = req.text
except:
pass
apiResp.result = 'ok'
apiResp.code = req.status_code
except requests.exceptions.RequestException as ex:
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
def DownloadFile(self, FullName):
apiResp = RestApiResponse()
try:
apiUri = '{0}/api/files/download?path={1}'.format(self.webApiUrl, urllib.parse.quote_plus(FullName))
req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
if req.status_code == 200:
apiResp.payload = req.content
else:
try:
apiResp.message = req.text
except:
pass
apiResp.result = 'ok'
apiResp.code = req.status_code
except requests.exceptions.RequestException as ex:
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
def DownloadFileBySha(self, Sha):
apiResp = RestApiResponse()
try:
apiUri = '{0}/api/files/download?sha={1}'.format(self.webApiUrl, urllib.parse.quote_plus(Sha))
req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
if req.status_code == 200:
apiResp.payload = req.content
else:
try:
apiResp.message = req.text
except:
pass
apiResp.result = 'ok'
apiResp.code = req.status_code
except requests.exceptions.RequestException as ex:
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
def GetParsedFileContent(self, Sha): def GetParsedFileContent(self, Sha):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
@ -176,12 +260,11 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def SubmitProcessedFile(self, FileId, AmbarFileBytes): def AddMetaIdToCache(self, MetaId):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
files = {'file': (FileId, AmbarFileBytes)} apiUri = '{0}/api/files/meta/{1}/processed'.format(self.apiUrl, MetaId)
apiUri = '{0}/api/files/file/{1}/processed'.format(self.apiUrl, FileId) req = requests.post(apiUri, timeout = self.apiCallTimeoutSeconds)
req = requests.post(apiUri, files=files, timeout = self.apiCallTimeoutSeconds)
try: try:
apiResp.message = req.text apiResp.message = req.text
except: except:
@ -193,11 +276,12 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def RemoveAutoTags(self, FileId): def SubmitProcessedFile(self, FileId, AmbarFileBytes):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
apiUri = '{0}/api/files/autotags/{1}'.format(self.apiUrl, FileId) files = {'file': (FileId, AmbarFileBytes)}
req = requests.delete(apiUri, timeout = self.apiCallTimeoutSeconds) apiUri = '{0}/api/files/file/{1}/processed'.format(self.apiUrl, FileId)
req = requests.post(apiUri, files=files, timeout = self.apiCallTimeoutSeconds)
try: try:
apiResp.message = req.text apiResp.message = req.text
except: except:
@ -209,11 +293,11 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def AddFileTag(self, FileId, TagType, TagName): def RemoveAutoTags(self, FileId):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
apiUri = '{0}/api/tags/service/{1}/{2}/{3}'.format(self.apiUrl, FileId, TagType, TagName) apiUri = '{0}/api/files/autotags/{1}'.format(self.apiUrl, FileId)
req = requests.post(apiUri, timeout = self.apiCallTimeoutSeconds) req = requests.delete(apiUri, timeout = self.apiCallTimeoutSeconds)
try: try:
apiResp.message = req.text apiResp.message = req.text
except: except:
@ -225,11 +309,11 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def CallExternalNER(self, ExternalNERUri, FileId, Sha): def AddFileTag(self, FileId, TagType, TagName):
apiResp = RestApiResponse() apiResp = RestApiResponse()
try: try:
body = { 'fileId': FileId, 'sha': Sha } apiUri = '{0}/api/tags/service/{1}/{2}/{3}'.format(self.apiUrl, FileId, TagType, TagName)
req = requests.post(ExternalNERUri, json=body, timeout = self.apiCallTimeoutSeconds) req = requests.post(apiUri, timeout = self.apiCallTimeoutSeconds)
try: try:
apiResp.message = req.text apiResp.message = req.text
except: except:
@ -271,29 +355,6 @@ class ApiProxy:
apiResp.message = str(ex) apiResp.message = str(ex)
return apiResp return apiResp
def GetAmbarCrawlerFileRegex(self, CrawlerId):
apiResp = RestApiResponse()
try:
apiUri = '{0}/api/crawlers/{1}'.format(self.apiUrl, CrawlerId)
req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
if req.status_code == 200:
try:
apiResp.payload = req.json()['file_regex']
except:
apiResp.result = 'error'
apiResp.message = str(ex)
else:
try:
apiResp.message = req.text
except:
pass
apiResp.result = 'ok'
apiResp.code = req.status_code
except requests.exceptions.RequestException as ex:
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
class RestApiResponse: class RestApiResponse:
def __init__(self): def __init__(self):
self.result = 'ok' self.result = 'ok'

@ -14,9 +14,6 @@ class ArchiveProcessor():
def Process(self, FileData, FileMeta, SourceId): def Process(self, FileData, FileMeta, SourceId):
self.logger.LogMessage('verbose','unzipping {0}'.format(FileMeta.full_name)) self.logger.LogMessage('verbose','unzipping {0}'.format(FileMeta.full_name))
##TODO: Get fileRegex from crawler settings
fileRegex = re.compile('(\\.doc[a-z]*$)|(\\.xls[a-z]*$)|(\\.txt$)|(\\.csv$)|(\\.htm[a-z]*$)|(\\.ppt[a-z]*$)|(\\.pdf$)|(\\.msg$)|(\\.zip$)|(\\.eml$)|(\\.rtf$)|(\\.md$)|(\\.png$)|(\\.bmp$)|(\\.tif[f]*$)|(\\.jp[e]*g$)',re.I)
try: try:
with ZipFile(io.BytesIO(FileData)) as zipFile: with ZipFile(io.BytesIO(FileData)) as zipFile:
for zipFileInfo in zipFile.infolist(): for zipFileInfo in zipFile.infolist():
@ -25,10 +22,6 @@ class ArchiveProcessor():
except: except:
unicodeName = zipFileInfo.filename unicodeName = zipFileInfo.filename
if not fileRegex.search(unicodeName):
self.logger.LogMessage('verbose','ignoring {0}/{1}'.format(FileMeta.full_name, unicodeName))
continue
fullNameInArchive = '{0}/{1}'.format(FileMeta.full_name, unicodeName) fullNameInArchive = '{0}/{1}'.format(FileMeta.full_name, unicodeName)
createUpdateTime = datetime( createUpdateTime = datetime(
zipFileInfo.date_time[0], zipFileInfo.date_time[0],
@ -80,7 +73,7 @@ class ArchiveProcessor():
self.logger.LogMessage('verbose', 'content found {0}'.format(fullNameInArchive)) self.logger.LogMessage('verbose', 'content found {0}'.format(fullNameInArchive))
## sending meta back to queue ## sending meta back to queue
fileMeta = AmbarFileMeta.InitWithoutId(createUpdateTime, createUpdateTime, unicodeName, fullNameInArchive, FileMeta.source_id) fileMeta = AmbarFileMeta.InitWithoutId(createUpdateTime, createUpdateTime, unicodeName, fullNameInArchive, FileMeta.source_id, [{'key': 'from_container', 'value': 'true'}])
apiResp = self.apiProxy.EnqueueAmbarFileMeta(fileMeta, sha, SourceId) apiResp = self.apiProxy.EnqueueAmbarFileMeta(fileMeta, sha, SourceId)
@ -92,10 +85,6 @@ class ArchiveProcessor():
self.logger.LogMessage('verbose', 'bad meta, ignoring... {0}'.format(fileMeta.full_name)) self.logger.LogMessage('verbose', 'bad meta, ignoring... {0}'.format(fileMeta.full_name))
continue continue
if apiResp.InsufficientStorage:
self.logger.LogMessage('verbose', 'insufficient storage'.format(fileMeta.full_name))
continue
if not apiResp.Ok: if not apiResp.Ok:
self.logger.LogMessage('error', 'unexpected response on adding meta {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message)) self.logger.LogMessage('error', 'unexpected response on adding meta {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
continue continue

@ -135,10 +135,9 @@ class PstProcessor():
self.logger.LogMessage('verbose', 'content found {0}'.format(fullNameInArchive)) self.logger.LogMessage('verbose', 'content found {0}'.format(fullNameInArchive))
# sending meta back to queue # sending meta back to queue
fileMeta = AmbarFileMeta.InitWithoutId(FileMeta.created_datetime, FileMeta.updated_datetime, fileName, fullNameInArchive, FileMeta.source_id) fileMeta = AmbarFileMeta.InitWithoutId(FileMeta.created_datetime, FileMeta.updated_datetime, fileName, fullNameInArchive, FileMeta.source_id, [{'key': 'from_container', 'value': 'true'}])
apiResp = self.apiProxy.EnqueueAmbarFileMeta( apiResp = self.apiProxy.EnqueueAmbarFileMeta(fileMeta, sha, SourceId)
fileMeta, sha, SourceId)
if not apiResp.Success: if not apiResp.Success:
self.logger.LogMessage('error', 'error adding meta {0} {1}'.format( self.logger.LogMessage('error', 'error adding meta {0} {1}'.format(
@ -149,10 +148,6 @@ class PstProcessor():
self.logger.LogMessage('verbose', 'bad meta, ignoring... {0}'.format(fileMeta.full_name)) self.logger.LogMessage('verbose', 'bad meta, ignoring... {0}'.format(fileMeta.full_name))
continue continue
if apiResp.InsufficientStorage:
self.logger.LogMessage('verbose', 'insufficient storage'.format(fileMeta.full_name))
continue
if not apiResp.Ok: if not apiResp.Ok:
self.logger.LogMessage('error', 'unexpected response on adding meta {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message)) self.logger.LogMessage('error', 'unexpected response on adding meta {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
continue continue

@ -167,7 +167,7 @@ class AmbarFileMeta:
return fullNameParts return fullNameParts
@classmethod @classmethod
def InitFromDictWithId(cls, MetaDict): def Init(cls, MetaDict):
amFileMeta = cls() amFileMeta = cls()
try: try:
amFileMeta.full_name = MetaDict['full_name'] amFileMeta.full_name = MetaDict['full_name']
@ -178,7 +178,7 @@ class AmbarFileMeta:
amFileMeta.source_id = MetaDict['source_id'] amFileMeta.source_id = MetaDict['source_id']
amFileMeta.created_datetime = MetaDict['created_datetime'] amFileMeta.created_datetime = MetaDict['created_datetime']
amFileMeta.updated_datetime = MetaDict['updated_datetime'] amFileMeta.updated_datetime = MetaDict['updated_datetime']
amFileMeta.id = MetaDict['id'] amFileMeta.id = sha256('{0}{1}{2}{3}'.format(MetaDict['source_id'],MetaDict['full_name'],MetaDict['created_datetime'],MetaDict['updated_datetime']).encode('utf-8')).hexdigest()
## non serializable content ## non serializable content
amFileMeta.initialized = True amFileMeta.initialized = True
amFileMeta.message = 'ok' amFileMeta.message = 'ok'
@ -189,14 +189,14 @@ class AmbarFileMeta:
@classmethod @classmethod
def InitWithoutId(cls, CreateTime, UpdateTime, ShortName, FullName, def InitWithoutId(cls, CreateTime, UpdateTime, ShortName, FullName,
AmbarCrawlerId): AmbarCrawlerId, Extra = []):
amFileMeta = cls() amFileMeta = cls()
try: try:
amFileMeta.full_name = FullName amFileMeta.full_name = FullName
amFileMeta.full_name_parts = AmbarFileMeta.ParseFullNameIntoParts(FullName) amFileMeta.full_name_parts = AmbarFileMeta.ParseFullNameIntoParts(FullName)
amFileMeta.short_name = ShortName amFileMeta.short_name = ShortName
amFileMeta.extension = path.splitext(ShortName)[1] if path.splitext(ShortName)[1] != '' else path.splitext(ShortName)[0] amFileMeta.extension = path.splitext(ShortName)[1] if path.splitext(ShortName)[1] != '' else path.splitext(ShortName)[0]
amFileMeta.extra = [] amFileMeta.extra = Extra
amFileMeta.source_id = AmbarCrawlerId amFileMeta.source_id = AmbarCrawlerId
if type(CreateTime) is str: if type(CreateTime) is str:
amFileMeta.created_datetime = CreateTime amFileMeta.created_datetime = CreateTime

@ -12,7 +12,6 @@ from datetime import datetime
import gc import gc
import io import io
import sys import sys
import argparse
import os import os
import time import time
import hashlib import hashlib
@ -26,21 +25,19 @@ RABBIT_HEARTBEAT = 0
API_CALL_TIMEOUT_SECONDS = 1200 API_CALL_TIMEOUT_SECONDS = 1200
PARSE_TIMEOUT_SECONDS = 1200 PARSE_TIMEOUT_SECONDS = 1200
parser = argparse.ArgumentParser() pipelineId = os.getenv('id', '0')
parser.add_argument('-id', default='0') apiUrl = os.getenv('api_url', 'http://serviceapi:8081')
parser.add_argument('-api_url', default='http://ambar:8081') webApiUrl = os.getenv('web_api_url', 'http://webapi:8080')
parser.add_argument('-rabbit_host', default='amqp://ambar') rabbitHost = os.getenv('rabbit_host','amqp://ambar')
ocrPdfSymbolsPerPageThreshold = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 1000)) ocrPdfSymbolsPerPageThreshold = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 1000))
ocrPdfMaxPageCount = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 5)) ocrPdfMaxPageCount = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 5))
preserveOriginals = True if os.getenv('preserveOriginals', 'False') == 'True' else False preserveOriginals = True if os.getenv('preserveOriginals', 'False') == 'True' else False
args = parser.parse_args()
# instantiating Api proxy # instantiating Api proxy
apiProxy = ApiProxy(args.api_url, API_CALL_TIMEOUT_SECONDS) apiProxy = ApiProxy(apiUrl, webApiUrl, API_CALL_TIMEOUT_SECONDS)
# instantiating logger # instantiating logger
logger = AmbarLogger(apiProxy, args.id) logger = AmbarLogger(apiProxy, pipelineId)
# instantiating ArchiveProcessor # instantiating ArchiveProcessor
archiveProcessor = ArchiveProcessor(logger, apiProxy) archiveProcessor = ArchiveProcessor(logger, apiProxy)
# instantiating PstProcessor # instantiating PstProcessor
@ -56,11 +53,11 @@ preserveOriginals = True if preserveOriginals else False
logger.LogMessage('info', 'started') logger.LogMessage('info', 'started')
# connecting to Rabbit # connecting to Rabbit
logger.LogMessage( logger.LogMessage('info', 'connecting to Rabbit {0}...'.format(rabbitHost))
'info', 'connecting to Rabbit {0}...'.format(args.rabbit_host))
try: try:
rabbitConnection = pika.BlockingConnection(pika.URLParameters( rabbitConnection = pika.BlockingConnection(pika.URLParameters(
'{0}?heartbeat={1}'.format(args.rabbit_host, RABBIT_HEARTBEAT))) '{0}?heartbeat={1}'.format(rabbitHost, RABBIT_HEARTBEAT)))
rabbitChannel = rabbitConnection.channel() rabbitChannel = rabbitConnection.channel()
rabbitChannel.basic_qos(prefetch_count=1, all_channels=True) rabbitChannel.basic_qos(prefetch_count=1, all_channels=True)
logger.LogMessage('info', 'connected to Rabbit!') logger.LogMessage('info', 'connected to Rabbit!')
@ -72,17 +69,87 @@ except Exception as e:
logger.LogMessage('info', 'waiting for messages...') logger.LogMessage('info', 'waiting for messages...')
def ProcessFile(sha, fileId, meta, sourceId): def ProcessFile(message):
try: try:
logger.LogMessage('verbose', 'task received {0}'.format(sha)) meta = message['meta']
event = message['event']
sha = None
logger.LogMessage('verbose', '{0} task received for {1}'.format(event, meta['full_name']))
if ('sha' in message):
sha = message['sha']
fileId = sha256('{0}{1}'.format(meta['source_id'],meta['full_name']).encode('utf-8')).hexdigest()
if (event == 'unlink'):
apiResp = apiProxy.HideFile(fileId)
if not apiResp.Success:
logger.LogMessage('error', 'error hidding file for {0} {1}'.format(meta['full_name'], apiResp.message))
return False
if apiResp.Ok:
logger.LogMessage('verbose', 'removed {0}'.format(meta['full_name']))
return True
if not apiResp.NotFound:
logger.LogMessage('error', 'error hidding file {0} {1} code: {2}'.format(meta['full_name'], apiResp.message, apiResp.code))
return False
return True
if (event != 'add' and event != 'change'):
print('Ignoring {0}'.format(event))
return True
apiResp = apiProxy.CheckIfMetaExists(meta)
fileMeta = AmbarFileMeta.InitFromDictWithId(meta) if not apiResp.Success:
logger.LogMessage('error', 'error checking meta existance for {0} {1}'.format(meta['full_name'], apiResp.message))
return False
if apiResp.Ok:
logger.LogMessage('verbose', 'meta found for {0}'.format(meta['full_name']))
return True
if not apiResp.NotFound:
logger.LogMessage('error', 'error checking meta existance for {0} {1} {2}'.format(meta['full_name'], apiResp.code, apiResp.message))
return False
apiResp = apiProxy.UnhideFile(fileId)
if not apiResp.Success:
logger.LogMessage('error', 'error unhiding file {0} {1}'.format(meta['full_name'], apiResp.message))
return False
if not (apiResp.Ok or apiResp.NotFound):
logger.LogMessage('error', 'error unhiding file, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
fileMeta = AmbarFileMeta.Init(meta)
if not fileMeta.initialized: if not fileMeta.initialized:
logger.LogMessage( logger.LogMessage('error', 'error initializing file meta {0}'.format(fileMeta.message))
'error', 'error initializing file meta {0}'.format(fileMeta.message))
return False return False
if (sha):
apiResp = apiProxy.DownloadFileBySha(sha)
else:
apiResp = apiProxy.DownloadFile(fileMeta.full_name)
if not apiResp.Success:
logger.LogMessage('error', 'error downloading file {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error downloading file {0} {1} code: {2}'.format(fileMeta.full_name, apiResp.message, apiResp.code))
return False
fileData = apiResp.payload
sha = sha256(fileData).hexdigest()
hasParsedContent = False hasParsedContent = False
fileContent = {} fileContent = {}
@ -125,42 +192,13 @@ def ProcessFile(sha, fileId, meta, sourceId):
'verbose', 'parsed content found {0}'.format(fileMeta.full_name)) 'verbose', 'parsed content found {0}'.format(fileMeta.full_name))
if not hasParsedContent: if not hasParsedContent:
apiResp = apiProxy.GetFileContent(sha)
if not apiResp.Success:
logger.LogMessage('error', 'error retrieving file content {0} {1}'.format(
fileMeta.full_name, apiResp.message))
return False
if apiResp.NotFound:
logger.LogMessage(
'verbose', 'file content not found {0}'.format(fileMeta.full_name))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error retrieving file content {0} {1} {2}'.format(
fileMeta.full_name, apiResp.code, apiResp.message))
return False
# file received
fileData = apiResp.payload
logger.LogMessage(
'verbose', 'file content received {0}'.format(fileMeta.full_name))
# checking received sha with calculated payload sha
calculatedSha = sha256(fileData).hexdigest()
if (calculatedSha != sha):
logger.LogMessage('error', 'calculated sha ({0}) is not equal to received sha ({1}) for {2}'.format(
calculatedSha, sha, fileMeta.full_name))
return False
# checking if file is archive # checking if file is archive
if ContentTypeAnalyzer.IsArchive(fileMeta.short_name): if ContentTypeAnalyzer.IsArchive(fileMeta.short_name):
archiveProcessor.Process(fileData, fileMeta, sourceId) archiveProcessor.Process(fileData, fileMeta, fileMeta.source_id)
# checking if file is pst # checking if file is pst
if ContentTypeAnalyzer.IsPst(fileMeta.short_name): if ContentTypeAnalyzer.IsPst(fileMeta.short_name):
pstProcessor.Process(fileData, fileMeta, sourceId) pstProcessor.Process(fileData, fileMeta, fileMeta.source_id)
# extracting # extracting
logger.LogMessage('verbose', 'parsing {0}'.format(fileMeta.full_name)) logger.LogMessage('verbose', 'parsing {0}'.format(fileMeta.full_name))
@ -205,13 +243,11 @@ def ProcessFile(sha, fileId, meta, sourceId):
sha, fileContent.text.encode(encoding='utf_8', errors='ignore')) sha, fileContent.text.encode(encoding='utf_8', errors='ignore'))
if not apiResp.Success: if not apiResp.Success:
logger.LogMessage('error', 'error submitting parsed text to Api {0} {1}'.format( logger.LogMessage('error', 'error submitting parsed text to Api {0} {1}'.format(fileMeta.full_name, apiResp.message))
fileMeta.full_name, apiResp.message))
return False return False
if not apiResp.Ok: if not apiResp.Ok:
logger.LogMessage('error', 'error submitting parsed text to Api, unexpected response code {0} {1} {2}'.format( logger.LogMessage('error', 'error submitting parsed text to Api, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
fileMeta.full_name, apiResp.code, apiResp.message))
return False return False
logger.LogMessage('verbose', 'parsed text submited {0}'.format(fileMeta.full_name)) logger.LogMessage('verbose', 'parsed text submited {0}'.format(fileMeta.full_name))
@ -226,8 +262,7 @@ def ProcessFile(sha, fileId, meta, sourceId):
ambarFile['file_id'] = fileId ambarFile['file_id'] = fileId
ambarFile['indexed_datetime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] ambarFile['indexed_datetime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
apiResp = apiProxy.SubmitProcessedFile(fileId, json.dumps( apiResp = apiProxy.SubmitProcessedFile(fileId, json.dumps(dict(ambarFile)).encode(encoding='utf_8', errors='ignore'))
dict(ambarFile)).encode(encoding='utf_8', errors='ignore'))
if not apiResp.Success: if not apiResp.Success:
logger.LogMessage('error', 'error submitting parsed content to Api {0} {1}'.format( logger.LogMessage('error', 'error submitting parsed content to Api {0} {1}'.format(
@ -239,21 +274,28 @@ def ProcessFile(sha, fileId, meta, sourceId):
fileMeta.full_name, apiResp.code, apiResp.message)) fileMeta.full_name, apiResp.code, apiResp.message))
return False return False
logger.LogMessage( logger.LogMessage('verbose', 'parsed content submited {0}'.format(fileMeta.full_name))
'verbose', 'parsed content submited {0}'.format(fileMeta.full_name))
apiResp = apiProxy.AddMetaIdToCache(fileMeta.id)
if not apiResp.Success:
logger.LogMessage('error', 'error adding meta id to cache {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
logger.LogMessage('error', 'error adding meta id to cache, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
# removing original file # removing original file
if not preserveOriginals: if not preserveOriginals:
apiResp = apiProxy.RemoveFileContent(sha) apiResp = apiProxy.RemoveFileContent(sha)
if not apiResp.Success: if not apiResp.Success:
logger.LogMessage('error', 'error removing original file from Ambar for {0} {1}'.format( logger.LogMessage('error', 'error removing original file from Ambar for {0} {1}'.format(fileMeta.full_name, apiResp.message))
fileMeta.full_name, apiResp.message))
return False return False
if not (apiResp.Ok or apiResp.NotFound): if not (apiResp.Ok or apiResp.NotFound):
logger.LogMessage('error', 'error removing original file from Ambar for {0}, unexpected response code {1} {2}'.format( logger.LogMessage('error', 'error removing original file from Ambar for {0}, unexpected response code {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
fileMeta.full_name, apiResp.code, apiResp.message))
return False return False
if apiResp.Ok: if apiResp.Ok:
@ -263,31 +305,25 @@ def ProcessFile(sha, fileId, meta, sourceId):
## tags ## tags
apiResp = apiProxy.RemoveAutoTags(fileId) apiResp = apiProxy.RemoveAutoTags(fileId)
if not apiResp.Success: if not apiResp.Success:
logger.LogMessage('error', 'error removing autotags {0} {1}'.format( logger.LogMessage('error', 'error removing autotags {0} {1}'.format(fileMeta.full_name, apiResp.message))
fileMeta.full_name, apiResp.message))
return False return False
if not apiResp.Ok: if not apiResp.Ok:
logger.LogMessage('error', 'error removing autotags, unexpected response code {0} {1} {2}'.format( logger.LogMessage('error', 'error removing autotags, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
fileMeta.full_name, apiResp.code, apiResp.message))
return False return False
autoTagger.AutoTagAmbarFile(ambarFile) autoTagger.AutoTagAmbarFile(ambarFile)
return True return True
except Exception as e: except Exception as e:
logger.LogMessage('error', 'error processing task {0} {1}'.format(sha, repr(e))) logger.LogMessage('error', 'error processing task {0}'.format(repr(e)))
return False return False
# main callback on receiving message from Rabbit # main callback on receiving message from Rabbit
def RabbitConsumeCallback(channel, method, properties, body): def RabbitConsumeCallback(channel, method, properties, body):
bodyObject = json.loads(body.decode('utf-8')) message = json.loads(body.decode('utf-8'))
sha = bodyObject['sha'] if (ProcessFile(message)):
fileId = bodyObject['fileId']
meta = bodyObject['meta']
sourceId = bodyObject['sourceId']
if (ProcessFile(sha, fileId, meta, sourceId)):
channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_ack(delivery_tag=method.delivery_tag)
else: else:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False) channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

@ -1,4 +1,4 @@
[![Version](https://img.shields.io/badge/Version-v2.0.0rc-brightgreen.svg)](https://ambar.cloud) [![Version](https://img.shields.io/badge/Version-v2.1.8-brightgreen.svg)](https://ambar.cloud)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/RD17/ambar/blob/master/License.txt) [![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/RD17/ambar/blob/master/License.txt)
[![Blog](https://img.shields.io/badge/Ambar%20Blog-%20Latest%20news%20and%20tutorials%20-brightgreen.svg)](https://blog.ambar.cloud) [![Blog](https://img.shields.io/badge/Ambar%20Blog-%20Latest%20news%20and%20tutorials%20-brightgreen.svg)](https://blog.ambar.cloud)
@ -49,6 +49,7 @@ Crawling is automatic, no schedule is needed since the crawler monitors fs event
* OpenOffice documents * OpenOffice documents
* RTF, Plaintext * RTF, Plaintext
* HTML / XHTML * HTML / XHTML
* Multithread processing (Only EE)
## Installation ## Installation
@ -94,6 +95,10 @@ It's limited by amount of RAM on your machine, typically it's 500MB. It's an awe
### I have a problem what should I do? ### I have a problem what should I do?
Request a dedicated support session by mailing us on hello@ambar.cloud Request a dedicated support session by mailing us on hello@ambar.cloud
## Sponsors
- [IFIC.co.uk](http://www.ific.co.uk/)
## Change Log ## Change Log
[Change Log](https://github.com/RD17/ambar/blob/master/CHANGELOG.md) [Change Log](https://github.com/RD17/ambar/blob/master/CHANGELOG.md)
@ -101,4 +106,4 @@ Request a dedicated support session by mailing us on hello@ambar.cloud
[Privacy Policy](https://github.com/RD17/ambar/blob/master/privacy-policy.md) [Privacy Policy](https://github.com/RD17/ambar/blob/master/privacy-policy.md)
## License ## License
[MIT License](https://github.com/RD17/ambar/blob/master/License.txt) [MIT License](https://github.com/RD17/ambar/blob/master/license.txt)

@ -59,6 +59,17 @@ export default ({ storage }) => {
.catch(next) .catch(next)
}) })
/**
* Cache processed meta id
*/
api.post('/meta/:metaId/processed', (req, res) => {
const { params: { metaId } } = req
CacheProxy.addMetaId(storage.redis, metaId)
res.sendStatus(200)
})
/** /**
* Enqueue meta for specified sha (enqueuing message to pipeline) * Enqueue meta for specified sha (enqueuing message to pipeline)
*/ */
@ -77,9 +88,9 @@ export default ({ storage }) => {
return return
} }
QueueProxy.enqueuePipelineMessage(storage, { sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta }) QueueProxy.enqueuePipelineMessage(storage, { event: 'add', sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta })
.then(() => { .then(() => {
CacheProxy.addMetaId(storage.redis, meta.id) //CacheProxy.addMetaId(storage.redis, meta.id)
res.sendStatus(200) res.sendStatus(200)
}) })
.catch(next) .catch(next)

@ -9,11 +9,6 @@ export const AMBAR_PIPELINE_WAITING_EXCHANGE = "AMBAR_PIPELINE_WAITING_EXCHANGE"
export const AMBAR_PIPELINE_WAITING_QUEUE_TTL = 60 * 60 * 1000 export const AMBAR_PIPELINE_WAITING_QUEUE_TTL = 60 * 60 * 1000
export const AMBAR_CRAWLER_QUEUE = "AMBAR_CRAWLER_QUEUE"
export const AMBAR_CRAWLER_EXCHANGE = "AMBAR_CRAWLER_EXCHANGE"
export const AMBAR_CRAWLER_MESSAGE_DEFAULT_TTL = 10 * 1000
const getPipelineMessagePriority = (storage, fileName) => new Promise((resolve) => { const getPipelineMessagePriority = (storage, fileName) => new Promise((resolve) => {
const regex = /(\.jp[e]*g$)|(\.png$)|(\.bmp$)|(\.tif[f]*$)|(\.pdf$)/i const regex = /(\.jp[e]*g$)|(\.png$)|(\.bmp$)|(\.tif[f]*$)|(\.pdf$)/i
const priority = regex.test(fileName) ? 1 : 2 const priority = regex.test(fileName) ? 1 : 2
@ -36,17 +31,6 @@ export const enqueuePipelineMessage = (storage, message) => new Promise((resolve
.catch(err => reject(err)) .catch(err => reject(err))
}) })
export const enqueueCrawlerMessage = (storage, message, ttl = AMBAR_CRAWLER_MESSAGE_DEFAULT_TTL) => new Promise((resolve, reject) => {
storage.rabbit.createConfirmChannel()
.then(channel => {
channel.publish(AMBAR_CRAWLER_EXCHANGE, '', Buffer.from(JSON.stringify(message)), { expiration: ttl })
return channel.waitForConfirms()
.then(() => channel.close())
})
.then(() => resolve())
.catch(err => reject(err))
})
export const initRabbit = new Promise((resolve, reject) => { export const initRabbit = new Promise((resolve, reject) => {
amqp.connect(`${config.rabbitHost}?heartbeat=60`) amqp.connect(`${config.rabbitHost}?heartbeat=60`)
.then((conn) => { .then((conn) => {
@ -68,10 +52,6 @@ export const initRabbit = new Promise((resolve, reject) => {
AMBAR_PIPELINE_EXCHANGE)) AMBAR_PIPELINE_EXCHANGE))
.then(() => channel.bindQueue(AMBAR_PIPELINE_WAITING_QUEUE, .then(() => channel.bindQueue(AMBAR_PIPELINE_WAITING_QUEUE,
AMBAR_PIPELINE_WAITING_EXCHANGE)) AMBAR_PIPELINE_WAITING_EXCHANGE))
.then(() => channel.assertExchange(AMBAR_CRAWLER_EXCHANGE, 'fanout', { durable: false }))
.then(() => channel.assertQueue(AMBAR_CRAWLER_QUEUE, { durable: false }))
.then(() => channel.bindQueue(AMBAR_CRAWLER_QUEUE,
AMBAR_CRAWLER_EXCHANGE))
.then(() => channel.close()) .then(() => channel.close())
) )
.then(() => resolve(conn)) .then(() => resolve(conn))

@ -20,8 +20,8 @@ export const buildMeta = (data) => {
const meta = { const meta = {
id: generateMetaId(source_id, full_name, created_datetime, updated_datetime), id: generateMetaId(source_id, full_name, created_datetime, updated_datetime),
short_name: short_name.toLowerCase(), short_name: short_name,
full_name: full_name.toLowerCase(), full_name: full_name,
source_id: source_id, source_id: source_id,
extension: extension, extension: extension,
created_datetime: created_datetime, created_datetime: created_datetime,
@ -35,8 +35,8 @@ export const buildMeta = (data) => {
export const buildShortMeta = (shortName, sourceId) => { export const buildShortMeta = (shortName, sourceId) => {
const short_name = shortName.toLowerCase() const short_name = shortName
const full_name = `//${sourceId.toLowerCase()}/${shortName.toLowerCase()}` const full_name = `//${sourceId}/${shortName}`
const source_id = sourceId const source_id = sourceId
let extension = '' let extension = ''
let calculatedExtension = FILE_EXTENSION_REGEX.exec(short_name) let calculatedExtension = FILE_EXTENSION_REGEX.exec(short_name)

@ -1,6 +1,6 @@
{ {
"name": "ambar-webapi", "name": "ambar-webapi",
"version": "1.3.0", "version": "2.1.0",
"description": "Ambar WebAPI", "description": "Ambar WebAPI",
"main": "dist", "main": "dist",
"scripts": { "scripts": {
@ -35,6 +35,7 @@
"eslint-plugin-promise": "^3.3.0", "eslint-plugin-promise": "^3.3.0",
"express": "^4.13.3", "express": "^4.13.3",
"gridfs-stream": "^1.1.1", "gridfs-stream": "^1.1.1",
"idempotent-babel-polyfill": "^0.1.1",
"minimist": "^1.2.0", "minimist": "^1.2.0",
"moment": "^2.15.0", "moment": "^2.15.0",
"mongodb": "^2.2.10", "mongodb": "^2.2.10",

@ -3,14 +3,20 @@ import ErrorResponse from '../utils/ErrorResponse'
import { import {
CryptoService, CryptoService,
EsProxy, EsProxy,
CacheProxy,
GridFsProxy, GridFsProxy,
MongoProxy, MongoProxy,
FileUploader, FileUploader,
QueueProxy QueueProxy,
CacheProxy
} from '../services' } from '../services'
import * as MetaBuilder from '../utils/MetaBuilder' import * as MetaBuilder from '../utils/MetaBuilder'
import config from '../config'
import request from 'request'
const DOWNLOAD_URL_REGEX = /^\/\/([^/]+)\/(.*)$/i
const generateFileId = (source_id, full_name) => { const generateFileId = (source_id, full_name) => {
return CryptoService.getSha256(`${source_id}${full_name}`) return CryptoService.getSha256(`${source_id}${full_name}`)
} }
@ -20,6 +26,58 @@ const generateExtractedTextFileName = (sha) => `text_${sha}`
export default ({ storage }) => { export default ({ storage }) => {
let api = Router() let api = Router()
api.get('/download', (req, res, next) => {
const filePath = req.query.path
const sha = req.query.sha
if (!filePath && !sha) {
res.sendStatus(400)
return
}
if (sha) {
GridFsProxy.checkIfFileExists(storage.mongoDb, sha)
.then(fileExsists => {
if (!fileExsists) {
res.status(404).json(new ErrorResponse('File content not found'))
return
}
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename*=UTF-8''${encodeURIComponent(sha)}`
})
GridFsProxy
.downloadFile(storage.mongoDb, sha)
.on('error', (err) => {
console.log('err during downloading by sha', err)
res.end()
})
.pipe(res)
})
.catch(next)
} else if (filePath) {
const match = DOWNLOAD_URL_REGEX.exec(filePath)
if (!match) {
res.sendStatus(400)
return
}
const { 1: crawlerName, 2: crawlerFilePath } = match
request
.get(`http://${crawlerName}:${config.crawlerPort}/api/download?path=${encodeURIComponent(crawlerFilePath)}`)
.on('error', (err) => {
console.log('err during downloading by path', err)
res.end()
})
.pipe(res)
}
})
//////////////// CALLED FROM UI /////////////////////////////////////////// //////////////// CALLED FROM UI ///////////////////////////////////////////
/** /**
* @api {get} api/files/:uri Download File Content by Secure Uri * @api {get} api/files/:uri Download File Content by Secure Uri
@ -186,9 +244,9 @@ export default ({ storage }) => {
return GridFsProxy.uploadFile(storage.mongoDb, sha, fileContent) return GridFsProxy.uploadFile(storage.mongoDb, sha, fileContent)
} }
}) })
.then(() => QueueProxy.enqueuePipelineMessage(storage, { sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta })) .then(() => QueueProxy.enqueuePipelineMessage(storage, { event: 'add', sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta }))
.then(() => { .then(() => {
CacheProxy.addMetaId(storage.redis, meta.id) // CacheProxy.addMetaId(storage.redis, meta.id)
res.status(200).json({ fileId: generateFileId(meta.source_id, meta.full_name) }) res.status(200).json({ fileId: generateFileId(meta.source_id, meta.full_name) })
}) })
.catch(next) .catch(next)
@ -211,14 +269,16 @@ export default ({ storage }) => {
api.put('/hide/:fileId', (req, res, next) => { api.put('/hide/:fileId', (req, res, next) => {
const fileId = req.params.fileId const fileId = req.params.fileId
EsProxy.checkIfFileExists(storage.elasticSearch, fileId) EsProxy.getFileByFileId(storage.elasticSearch, fileId)
.then(fileExists => { .then(file => {
if (!fileExists) { if (!file) {
res.sendStatus(404) res.sendStatus(404)
return return
} }
return EsProxy.hideFile(storage.elasticSearch, fileId) CacheProxy.removeMetaId(storage.redis, file.meta.id)
return EsProxy.hideFile(storage.elasticSearch, file.file_id)
.then(() => res.sendStatus(200)) .then(() => res.sendStatus(200))
}) })
.catch(next) .catch(next)
@ -241,26 +301,17 @@ export default ({ storage }) => {
api.put('/unhide/:fileId', (req, res, next) => { api.put('/unhide/:fileId', (req, res, next) => {
const fileId = req.params.fileId const fileId = req.params.fileId
EsProxy.checkIfFileExists(storage.elasticSearch, fileId) EsProxy.unHideFile(storage.elasticSearch, fileId)
.then(fileExists => {
if (!fileExists) {
res.sendStatus(404)
return
}
return EsProxy.unHideFile(storage.elasticSearch, fileId)
.then(() => res.sendStatus(200)) .then(() => res.sendStatus(200))
.catch(err => { .catch(err => {
if ((err.statusCode) && (err.statusCode == 404)) { if ((err.statusCode) && (err.statusCode == 404)) {
res.sendStatus(200) res.sendStatus(404)
return return
} }
throw new Error(err) next(err)
}) })
}) })
.catch(next)
})
return api return api
} }

@ -13,6 +13,7 @@ const defaultConfig = {
"rabbitHost": "amqp://ambar", "rabbitHost": "amqp://ambar",
"uiLang": "en", "uiLang": "en",
"analyticsToken": "", "analyticsToken": "",
"crawlerPort": 8082
} }
const intParamsList = ['localPort'] const intParamsList = ['localPort']

@ -1,3 +1,4 @@
import 'idempotent-babel-polyfill'
import http from 'http' import http from 'http'
import express from 'express' import express from 'express'
import cors from 'cors' import cors from 'cors'

@ -3,6 +3,7 @@ import { EsProxy, DateTimeService } from './index'
const TAGS_HASH_NAME = 'tags' const TAGS_HASH_NAME = 'tags'
export const addMetaId = (redis, metaId) => { redis.set(`meta:${metaId}`, DateTimeService.getCurrentDateTime()) } export const addMetaId = (redis, metaId) => { redis.set(`meta:${metaId}`, DateTimeService.getCurrentDateTime()) }
export const removeMetaId = (redis, metaId) => { redis.del(`meta:${metaId}`) }
export const checkIfTokenExists = (redis, token) => redis.getAsync(token) export const checkIfTokenExists = (redis, token) => redis.getAsync(token)
export const addToken = (redis, token, ttlSeconds) => { export const addToken = (redis, token, ttlSeconds) => {

@ -10,7 +10,7 @@ const getPipelineMessagePriority = (storage, fileName) => new Promise((resolve)
}) })
export const enqueuePipelineMessage = (storage, message) => new Promise((resolve, reject) => { export const enqueuePipelineMessage = (storage, message) => new Promise((resolve, reject) => {
const fileName = message.meta.short_name const fileName = message.fileName || message.meta.short_name
storage.rabbit.createConfirmChannel() storage.rabbit.createConfirmChannel()
.then(channel => { .then(channel => {

@ -8,8 +8,8 @@ const generateMetaId = (source_id, full_name, created_datetime, updated_datetime
export const buildShortMeta = (shortName, sourceId) => { export const buildShortMeta = (shortName, sourceId) => {
const short_name = shortName.toLowerCase() const short_name = shortName
const full_name = `//${sourceId.toLowerCase()}/${shortName.toLowerCase()}` const full_name = `//${sourceId}/${shortName}`
const source_id = sourceId const source_id = sourceId
let extension = '' let extension = ''
let calculatedExtension = FILE_EXTENSION_REGEX.exec(short_name) let calculatedExtension = FILE_EXTENSION_REGEX.exec(short_name)

@ -707,6 +707,14 @@ babel-polyfill@^6.16.0:
core-js "^2.4.0" core-js "^2.4.0"
regenerator-runtime "^0.9.5" regenerator-runtime "^0.9.5"
babel-polyfill@^6.26.0:
version "6.26.0"
resolved "http://192.168.1.113:4873/babel-polyfill/-/babel-polyfill-6.26.0.tgz#379937abc67d7895970adc621f284cd966cf2153"
dependencies:
babel-runtime "^6.26.0"
core-js "^2.5.0"
regenerator-runtime "^0.10.5"
babel-preset-es2015@^6.9.0: babel-preset-es2015@^6.9.0:
version "6.18.0" version "6.18.0"
resolved "https://registry.yarnpkg.com/babel-preset-es2015/-/babel-preset-es2015-6.18.0.tgz#b8c70df84ec948c43dcf2bf770e988eb7da88312" resolved "https://registry.yarnpkg.com/babel-preset-es2015/-/babel-preset-es2015-6.18.0.tgz#b8c70df84ec948c43dcf2bf770e988eb7da88312"
@ -790,6 +798,13 @@ babel-runtime@^6.0.0, babel-runtime@^6.11.6, babel-runtime@^6.9.0, babel-runtime
core-js "^2.4.0" core-js "^2.4.0"
regenerator-runtime "^0.9.5" regenerator-runtime "^0.9.5"
babel-runtime@^6.26.0:
version "6.26.0"
resolved "http://192.168.1.113:4873/babel-runtime/-/babel-runtime-6.26.0.tgz#965c7058668e82b55d7bfe04ff2337bc8b5647fe"
dependencies:
core-js "^2.4.0"
regenerator-runtime "^0.11.0"
babel-template@^6.14.0, babel-template@^6.15.0, babel-template@^6.16.0, babel-template@^6.8.0: babel-template@^6.14.0, babel-template@^6.15.0, babel-template@^6.16.0, babel-template@^6.8.0:
version "6.16.0" version "6.16.0"
resolved "https://registry.yarnpkg.com/babel-template/-/babel-template-6.16.0.tgz#e149dd1a9f03a35f817ddbc4d0481988e7ebc8ca" resolved "https://registry.yarnpkg.com/babel-template/-/babel-template-6.16.0.tgz#e149dd1a9f03a35f817ddbc4d0481988e7ebc8ca"
@ -1109,6 +1124,10 @@ core-js@^2.4.0:
version "2.4.1" version "2.4.1"
resolved "https://registry.yarnpkg.com/core-js/-/core-js-2.4.1.tgz#4de911e667b0eae9124e34254b53aea6fc618d3e" resolved "https://registry.yarnpkg.com/core-js/-/core-js-2.4.1.tgz#4de911e667b0eae9124e34254b53aea6fc618d3e"
core-js@^2.5.0:
version "2.5.6"
resolved "http://192.168.1.113:4873/core-js/-/core-js-2.5.6.tgz#0fe6d45bf3cac3ac364a9d72de7576f4eb221b9d"
core-util-is@~1.0.0: core-util-is@~1.0.0:
version "1.0.2" version "1.0.2"
resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.2.tgz#b5fd54220aa2bc5ab57aab7140c940754503c1a7" resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.2.tgz#b5fd54220aa2bc5ab57aab7140c940754503c1a7"
@ -1785,6 +1804,12 @@ iconv-lite@^0.4.17:
dependencies: dependencies:
safer-buffer "^2.1.0" safer-buffer "^2.1.0"
idempotent-babel-polyfill@^0.1.1:
version "0.1.1"
resolved "http://192.168.1.113:4873/idempotent-babel-polyfill/-/idempotent-babel-polyfill-0.1.1.tgz#f85d2ecaf36b05a652457b25c804289512360d94"
dependencies:
babel-polyfill "^6.26.0"
ignore-by-default@^1.0.0: ignore-by-default@^1.0.0:
version "1.0.1" version "1.0.1"
resolved "https://registry.yarnpkg.com/ignore-by-default/-/ignore-by-default-1.0.1.tgz#48ca6d72f6c6a3af00a9ad4ae6876be3889e2b09" resolved "https://registry.yarnpkg.com/ignore-by-default/-/ignore-by-default-1.0.1.tgz#48ca6d72f6c6a3af00a9ad4ae6876be3889e2b09"
@ -2703,6 +2728,14 @@ regenerate@^1.2.1:
version "1.3.2" version "1.3.2"
resolved "https://registry.yarnpkg.com/regenerate/-/regenerate-1.3.2.tgz#d1941c67bad437e1be76433add5b385f95b19260" resolved "https://registry.yarnpkg.com/regenerate/-/regenerate-1.3.2.tgz#d1941c67bad437e1be76433add5b385f95b19260"
regenerator-runtime@^0.10.5:
version "0.10.5"
resolved "http://192.168.1.113:4873/regenerator-runtime/-/regenerator-runtime-0.10.5.tgz#336c3efc1220adcedda2c9fab67b5a7955a33658"
regenerator-runtime@^0.11.0:
version "0.11.1"
resolved "http://192.168.1.113:4873/regenerator-runtime/-/regenerator-runtime-0.11.1.tgz#be05ad7f9bf7d22e056f9726cee5017fbf19e2e9"
regenerator-runtime@^0.9.5: regenerator-runtime@^0.9.5:
version "0.9.6" version "0.9.6"
resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.9.6.tgz#d33eb95d0d2001a4be39659707c51b0cb71ce029" resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.9.6.tgz#d33eb95d0d2001a4be39659707c51b0cb71ce029"

@ -13,8 +13,6 @@ services:
- ${dataPath}/db:/data/db - ${dataPath}/db:/data/db
expose: expose:
- "27017" - "27017"
ports:
- "27017:27017"
es: es:
restart: always restart: always
networks: networks:
@ -22,8 +20,6 @@ services:
image: ambar/ambar-es:latest image: ambar/ambar-es:latest
expose: expose:
- "9200" - "9200"
ports:
- "9200:9200"
environment: environment:
- cluster.name=ambar-es - cluster.name=ambar-es
- ES_JAVA_OPTS=-Xms2g -Xmx2g - ES_JAVA_OPTS=-Xms2g -Xmx2g
@ -47,9 +43,6 @@ services:
expose: expose:
- "15672" - "15672"
- "5672" - "5672"
ports:
- "15672:15672"
- "5672:5672"
volumes: volumes:
- ${dataPath}/rabbit:/var/lib/rabbitmq - ${dataPath}/rabbit:/var/lib/rabbitmq
redis: redis:
@ -61,8 +54,6 @@ services:
image: ambar/ambar-redis:latest image: ambar/ambar-redis:latest
expose: expose:
- "6379" - "6379"
ports:
- "6379:6379"
serviceapi: serviceapi:
depends_on: depends_on:
redis: redis:
@ -79,8 +70,6 @@ services:
image: ambar/ambar-serviceapi:latest image: ambar/ambar-serviceapi:latest
expose: expose:
- "8081" - "8081"
ports:
- "8081:8081"
environment: environment:
- mongoDbUrl=mongodb://db:27017/ambar_data - mongoDbUrl=mongodb://db:27017/ambar_data
- elasticSearchUrl=http://es:9200 - elasticSearchUrl=http://es:9200
@ -88,8 +77,6 @@ services:
- redisPort=6379 - redisPort=6379
- rabbitHost=amqp://rabbit - rabbitHost=amqp://rabbit
- langAnalyzer=${langAnalyzer} - langAnalyzer=${langAnalyzer}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
webapi: webapi:
depends_on: depends_on:
serviceapi: serviceapi:
@ -111,8 +98,6 @@ services:
- redisPort=6379 - redisPort=6379
- serviceApiUrl=http://serviceapi:8081 - serviceApiUrl=http://serviceapi:8081
- rabbitHost=amqp://rabbit - rabbitHost=amqp://rabbit
volumes:
- /var/run/docker.sock:/var/run/docker.sock
frontend: frontend:
depends_on: depends_on:
webapi: webapi:
@ -139,7 +124,7 @@ services:
- id=0 - id=0
- api_url=http://serviceapi:8081 - api_url=http://serviceapi:8081
- rabbit_host=amqp://rabbit - rabbit_host=amqp://rabbit
crawler0: ${crawlerName}:
depends_on: depends_on:
serviceapi: serviceapi:
condition: service_healthy condition: service_healthy
@ -147,9 +132,9 @@ services:
restart: always restart: always
networks: networks:
- internal_network - internal_network
expose:
- "8082"
environment: environment:
- apiUrl=http://serviceapi:8081
- crawlPath=/usr/data
- name=${crawlerName} - name=${crawlerName}
volumes: volumes:
- ${pathToCrawl}:/usr/data - ${pathToCrawl}:/usr/data

Loading…
Cancel
Save