Class ArchiveBuffer
- java.lang.Object
-
- java.lang.Thread
-
- de.bsvrz.pua.prot.processing.ProcessingBuffer
-
- de.bsvrz.pua.prot.processing.archivebuffer.ArchiveBuffer
-
- All Implemented Interfaces:
java.lang.Runnable
public class ArchiveBuffer extends ProcessingBuffer
Puffer für die Daten vom Archivsystem, die von der Datenaufbereitung angefordert werden. Der folgende Algorithmus wird für jeden Zeitbereich wiederholt:
Für jedes reale Element, d.h. für jedes reale Attribut und für jede Attributgruppe wird eine Archivanfrage über den Zeitbereich und Datenart gestellt. Der jeweils erste pro Element erhaltene Datensatz wird markiert. Anschließend werden die Datensätze verschränkt. Dabei wird ein "Gewinner" festgestellt. Der Gewinner ist das Element mit dem kleinsten Datenzeitstempel. Danach werden die Werte der übrigen Attribute aufgefüllt und als Ausgangsdatensatz in einem Puffer abgelegt. Im nächsten Schritt wird ein weiterer Datensatz des Gewinners abgerufen, markiert mit den Datensätzen der übrigen Elementen verschränkt und dabei ein neuer "Gewinner" bestimmt(*). Nun wird die Datenaufbereitung benachrichtigt, dass ein Ausgangsdatensatz vorliegt. Grund für diese Verzögerung ist, dass die Bearbeitung eines Ausgangsdatensatzes erst vollständig abgeschlossen ist, wenn der nächste Ausgangsdatensatz erzeugt wurde. Der in (*) gewonnene Ausgangsdatensatz wird im Puffer abgelegt, und der Algorithmus wiederholt sich bis alle Daten vom Archivsystem abgerufen wurden. Ausgangsdatensatz wird ebenfalls in einem Puffer abgelegt. Wird in einem eigenen Thread gestartet, da ggf. auf die Antwort des Archivsystems gewartet werden muss. Da Archivdaten streambasiert abgefragt werden können, stellt der ArchivBuffer sicher, dass sich im Puffer für die Ausgangsdatensätze nur eine bestimmte Anzahl von Einträgen ansammelt. Wird dieses Limit (MAX_THRESHOLD) erreicht, so stellt der Archivbuffer die arbeit ein, bis sich der Puffer wieder fast vollständig (MIN_THRESHOLD) geleert hat.
-
-
Field Summary
Fields Modifier and Type Field Description static intMAX_THRESHOLDAnzahl Ausgangsdatensätze, die im Ausgangspuffer liegen.static intMIN_THRESHOLDAnzahl Ausgangsdatensätze, die mindestens im Ausgangspuffer liegen sollten.-
Fields inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer
_buffer, _bufferResult, _connection, _dataSetBuilder, _debug, _done, _periods, _processingInformation, _realElements, _tempElements, INITIAL_RINGBUFFER_SIZE
-
-
Constructor Summary
Constructors Constructor Description ArchiveBuffer(de.bsvrz.dav.daf.main.ClientDavInterface dav, ProcessingInterface processor, de.bsvrz.dav.daf.main.config.ConfigurationObject configAuth, ProcessingInformation pi, java.util.List<de.bsvrz.sys.funclib.losb.util.Tuple<java.lang.Long,java.lang.Long>> periods, java.lang.String scriptName, de.bsvrz.dav.daf.accessControl.UserInfo userInfo)Startet den Online-ProcessingBuffer.
-
Method Summary
Modifier and Type Method Description booleanapplyAggregations(java.util.List<BaseDataSet> baseData)Führt die Aggregierungen durch.int[]getLinkedAttributes()Zeigt an welche Attribute durch die Aggregationspaltezusammengefasst werden.booleanhasData()Zeigt an ob Daten abgeholt werden können.voidinit()Sendet erste Anfragen an das Archivsystem.booleanisDone()Zeigt ob der Buffer noch weitere Daten liefern wird.protected booleanisDoneCollecting()Überprüft ob die Datensammlung abgeschlossen ist.protected booleanisListAggregation()Werden nicht aggregierte Daten versendet?protected voidprocessNewData(ValueProvider[] elements, java.util.ArrayList<ValueProvider> winners)Bereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf.protected voidrequestData()Stellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von_archiveUservoidstoreAggregatedData(byte status)Falls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt.IntermediateDataSettake()Liefert einen Ausgangsdatensatz zurück.-
Methods inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer
abort, applyPostFilter, getInsertEmpty, getProtocolType, getResult, getScriptName, getTimeStampOrigin, getWinners, isAbort, isAggregate, nextInterval, notifyProcessor, run, setAggregate, setDone, setTempAttributes, size, storeDataSet
-
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, suspend, toString, yield
-
-
-
-
Constructor Detail
-
ArchiveBuffer
public ArchiveBuffer(de.bsvrz.dav.daf.main.ClientDavInterface dav, ProcessingInterface processor, de.bsvrz.dav.daf.main.config.ConfigurationObject configAuth, ProcessingInformation pi, java.util.List<de.bsvrz.sys.funclib.losb.util.Tuple<java.lang.Long,java.lang.Long>> periods, java.lang.String scriptName, de.bsvrz.dav.daf.accessControl.UserInfo userInfo) throws de.bsvrz.sys.funclib.losb.exceptions.FailureExceptionStartet den Online-ProcessingBuffer. Meldet sich für jede benötigten Datenidentifikation beim Datenverteiler an.init()sollte unverzüglich nach Erzeugen des Objekts aufgerufen werden.- Parameters:
dav- Verbindung zum Datenverteilerprocessor- Objekt das die Datenaufbereitung durchführt. Wird jedes Mal benachrichtigt, wenn ein Ausgangsdatensatz vorliegt.configAuth- Konfigurationsverantwortlicher, dessen Archivsystem verwendet wird.pi- Informationen zur Datenaufbereitung. Die Zeitbereiche müssen bereits sortiert und zusammengefasst sein!periods- Zeitbereiche, in denen der Archivbuffer Daten sammeln soll. Inhalt wird geändert!scriptName- Bezeichnung des Skripts zur Verwendung in Fehlerausgaben.userInfo- Benutzer für Rechteprüfung- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException- Fehler bei der Kommunikation mit der Konfiguration
-
-
Method Detail
-
init
public void init() throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedExceptionSendet erste Anfragen an das Archivsystem. Startet Anschließend einen Thread, der die Daten vom Archivsystem entgegennimmt. Falls es zu einem Fehler bei der Archivanfrage kommt, wird der Thread nicht gestartet, undProcessingBuffer.abort()aufgerufen.- Specified by:
initin classProcessingBuffer- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException- Fehler bei der Archivanfrage.java.lang.InterruptedException- Warten auf Archivantwort wurde unterbrochen.- See Also:
ProcessingBuffer.init()
-
applyAggregations
public boolean applyAggregations(java.util.List<BaseDataSet> baseData)
Description copied from class:ProcessingBufferFührt die Aggregierungen durch. Die Aggregierungen werden jedoch nur durchgeführt, fallsProcessingBuffer.isAggregate()trueliefert.- Specified by:
applyAggregationsin classProcessingBuffer- Parameters:
baseData- Werte des Ausgangsdatensatz. Einträge können von der Methode geändert werden.- Returns:
true: Der Ausgangsdatensatz soll nachgefiltert und ausgegeben werden.- See Also:
ProcessingBuffer.applyAggregations(List)
-
requestData
protected void requestData() throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedExceptionStellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von_archiveUser- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException- Fehler bei der Archivanfrage.java.lang.InterruptedException- Warten auf Antwortdatensatz wurde unterbrochen.
-
isDoneCollecting
protected boolean isDoneCollecting() throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedExceptionÜberprüft ob die Datensammlung abgeschlossen ist.- Specified by:
isDoneCollectingin classProcessingBuffer- Returns:
Truefalls die Datensammlung abgeschlossen ist. Überprüft zudem, wieviele Elemente sich in der Warteschlange befinden. Ist die Warteschlange bereits überMAX_THRESHOLDgefüllt, so wird der ArchivBuffer angehalten, bis die Queue wieder fast vollständig (MIN_THRESHOLD) geelert ist.- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException- Fehler bei der Archivanfrage. Es wird eine Archivanfrage gestellt, wenn Daten für ein weiteres Intervall angefragt werden.java.lang.InterruptedException- Warten auf Antwortdatensatz wurde unterbrochen.- See Also:
ProcessingBuffer.isDoneCollecting()
-
isDone
public boolean isDone()
Description copied from class:ProcessingBufferZeigt ob der Buffer noch weitere Daten liefern wird.- Specified by:
isDonein classProcessingBuffer- Returns:
truefalls noch weitere Daten zu erwarten sind.- See Also:
ProcessingBuffer.isDone()
-
hasData
public boolean hasData()
Zeigt an ob Daten abgeholt werden können.- Specified by:
hasDatain classProcessingBuffer- Returns:
truefalls Daten mittelstake()abgeholt werden können.
-
take
public IntermediateDataSet take() throws java.lang.InterruptedException
Description copied from class:ProcessingBufferLiefert einen Ausgangsdatensatz zurück. Setzt dabei das 'zeitdauer' Attribut. Liegt keiner vor, kehrt die Methode sofort mit dem Rückgabewert null zurück.- Overrides:
takein classProcessingBuffer- Returns:
- Ausgangsdatensatz und Status oder null, falls keiner vorliegt.
- Throws:
java.lang.InterruptedException- Warten auf Ausgangsdatensatz wurde unterbrochen.- See Also:
ProcessingBuffer.take()
-
isListAggregation
protected boolean isListAggregation()
Description copied from class:ProcessingBufferWerden nicht aggregierte Daten versendet?- Specified by:
isListAggregationin classProcessingBuffer- Returns:
truefallsListeeine der ausgewählten die Aggregationsanwendungen ist.- See Also:
ProcessingBuffer.isListAggregation()
-
getLinkedAttributes
public int[] getLinkedAttributes()
Zeigt an welche Attribute durch die Aggregationspaltezusammengefasst werden. Zusammengehörige Spalten werden duch die gleichen Nummern gekennzeichnet.- Returns:
- Zusammen gehörende Spalten.
nullfalls diespaltenAggregation nicht verwendet wird.
-
storeAggregatedData
public void storeAggregatedData(byte status)
Description copied from class:ProcessingBufferFalls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt.- Specified by:
storeAggregatedDatain classProcessingBuffer- Parameters:
status- Status, den der Aggregationsdatensatz erhalten soll.- See Also:
ProcessingBuffer.storeAggregatedData(byte)
-
processNewData
protected void processNewData(ValueProvider[] elements, java.util.ArrayList<ValueProvider> winners) throws de.bsvrz.sys.funclib.losb.exceptions.FailureException, java.lang.InterruptedException
Description copied from class:ProcessingBufferBereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf.- Specified by:
processNewDatain classProcessingBuffer- Parameters:
elements- Liste aller Datenlieferanten.winners- Rückgabe: Liste der Datensätze mit minimalem, nicht aufgefüllten Datenzeitstempel. (Es sollte eine leere ArrayList übergeben werden)- Throws:
de.bsvrz.sys.funclib.losb.exceptions.FailureException- Fehler bei der Aufbereitung.java.lang.InterruptedException- Aufbereitung wurde unterbrochen.- See Also:
ProcessingBuffer.processNewData(ValueProvider[], ArrayList)
-
-