62 votes

Méthode recommandée pour interroger un grand nombre d'entités ndb à partir du magasin de données

J'ai couru dans une intéressante limite avec l'App Moteur de base de données. Je suis entrain de créer un handler pour nous aider à analyser des données d'utilisation sur un de nos serveurs de production. Pour effectuer l'analyse j'ai besoin d'interroger et de résumer de+ de 10 000 entités extraites de la banque de données. Le calcul n'est pas difficile, il est juste un histogramme des éléments qui passent par un filtre spécifique de l'utilisation des échantillons. Le problème que je rencontre c'est que je ne peux pas récupérer les données de la banque de données assez rapide pour faire le traitement avant de frapper la requête date limite.

J'ai essayé tout ce que je pense de segmenter la requête en parallèle des appels RPC pour améliorer les performances, mais selon appstats je n'arrive pas à obtenir les requêtes à l'exécution en parallèle. Peu importe la méthode que j'essaie (voir ci-dessous), il semble toujours que le cpr a fait retomber en cascade séquentiel suivant les requêtes.

Remarque: la requête et d'analyse de code fonctionne, il fonctionne lentement parce que je ne peux pas obtenir rapidement des données de la banque de données.

Arrière-plan

Je n'ai pas une version live, je peux partager, mais ici, c'est le modèle de base pour la partie du système dont je parle:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

Vous pouvez penser à des échantillons comme des moments où un utilisateur fait usage d'une capacité d'un nom donné. (ex: "systemA.feature_x'). Les balises sont fondées sur des informations du client, système d'information et la fonctionnalité. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). Donc les balises forme d'un dénormalisée ensemble de jetons, qui pourraient être utilisés pour trouver des échantillons d'intérêt.

L'analyse que je suis en train de faire consiste à prendre une plage de dates et de demander combien de fois a été une caractéristique de l'ensemble des fonctionnalités (peut-être tous éléments) utilisé par jour (ou par heure) par compte client (entreprise, et non par utilisateur).

Donc l'entrée au gestionnaire être quelque chose comme:

  • Date De Début De
  • Date De Fin
  • Tag(s)

La sortie serait:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Code commun pour les Requêtes

Voici un peu de code en commun pour toutes les requêtes. La structure générale du gestionnaire est un simple gestionnaire d'obtenir l'aide de webapp2 qui définit les paramètres de la requête, exécute la requête, les processus, les résultats, crée des données de retour.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Méthodes Essayé

J'ai essayé une variété de méthodes pour tenter d'extraire des données de la banque de données aussi rapidement que possible et en parallèle. Les méthodes que j'ai essayé jusqu'à maintenant:

A. Seule Itération

Il s'agit plus d'une simple base de cas pour comparer par rapport aux autres méthodes. Je viens de créer la requête et itération sur tous les éléments de laisser ndb faire ce qu'il fait pour tirer d'eux l'un après l'autre.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Les Grandes Récupérer

L'idée ici était de voir si je pouvais faire un très grand effort.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Async extrait dans le temps de la gamme

L'idée ici est de reconnaître que les échantillons sont assez espacés dans le temps afin que je puisse créer un ensemble indépendant de requêtes qui a divisé le temps global de la région en morceaux et essayez d'exécuter chacune de ces en parallèle en utilisant async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Async cartographie

J'ai essayé cette méthode, car la documentation fait sonner comme ndb peut exploiter certaines parallélisme automatiquement lors de l'utilisation de la Requête.map_async méthode.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Résultat

J'ai testé un exemple de requête pour recueillir des temps de réponse global et appstats traces. Les résultats sont les suivants:

A. Seule Itération

real: 15.645 s

Celui-ci passe successivement à travers de l'extraction de lots, l'un après l'autre, puis récupère chaque session de memcache.

Method A appstats

B. Les Grandes Récupérer

real: 12.12 s

Effectivement la même chose que l'option A, mais un peu plus rapide pour une raison quelconque.

Method B appstats

C. Async extrait dans le temps de la gamme

real: 15.251 s

Il semble apporter plus de parallélisme au début, mais semble avoir ralenti par une série d'appels à la prochaine au cours d'une itération des résultats. Aussi ne semble pas être en mesure de se chevaucher la session memcache recherches avec les requêtes en cours.

Method C appstats

D. Async cartographie

real: 13.752 s

Celui-ci est le plus difficile pour moi de comprendre. On dirait qu'il a q bonne affaire de chevauchement, mais tout semble s'étirer dans une chute d'eau au lieu d'en parallèle.

Method D appstats

Recommandations

Basé sur tout cela, ce qui me manque? Suis-je tout simplement de frapper une limite sur App Engine ou est-il une meilleure façon de tirer vers le bas grand nombre d'entités en parallèle?

Je suis à une perte quant à ce qu'à essayer la prochaine. J'ai pensé à la réécriture du client, de faire de multiples demandes d'app engine en parallèle, mais cela semble assez de force brute. Je voudrais vraiment s'attendre à ce que l'application du moteur doit être capable de gérer ce cas d'utilisation, donc je suppose que il y a quelque chose que je suis absent.

Mise à jour

En fin de compte j'ai trouvé que l'option C était le mieux pour mon cas. J'ai pu l'optimiser pour terminer en 6,1 secondes. Pas encore parfait, mais beaucoup mieux.

Après l'obtention de l'avis de plusieurs personnes, j'ai trouvé que les éléments suivants ont été la clé pour comprendre et garder à l'esprit:

  • Plusieurs requêtes peuvent s'exécuter en parallèle
  • Seulement 10 RPC peut être dans le vol à la fois
  • Essayez d'éliminer au point qu'il n'y a pas secondaire requêtes
  • Ce type de tâche est mieux à gauche de la carte de réduire et de files d'attente de tâches, pas les requêtes en temps réel

Donc, ce que j'ai fait pour le rendre plus rapide:

  • J'ai partitionné la requête de l'espace à partir du début basé sur le temps. (remarque: plus d'égalité les partitions sont en termes d'entités retournées, le mieux)
  • Je dénormalisée les données à supprimer la nécessité pour la session secondaire requête
  • J'ai fait usage de ndb opérations asynchrones et wait_any() pour chevaucher les requêtes avec le traitement

Je suis toujours pas à obtenir la performance, je attendre ou comme, mais il est réalisable pour l'instant. Je souhaite juste leur a une meilleure façon de tirer un grand nombre de séquentielles des entités en mémoire rapidement dans les gestionnaires.

8voto

mjibson Points 7176

Grand traitement comme cela ne devrait pas être fait à la demande d'un utilisateur, qui a un 60 limite de temps. Au lieu de cela, il doit être fait dans un contexte qui prend en charge à long demandes en cours d'exécution. La tâche de la file d'attente prend en charge les demandes jusqu'à 10 minutes, et (je crois) de la mémoire de contention (F1 cas, le défaut, de 128 MO de mémoire). Pour encore plus de limites (pas de délai d'attente de demande, 1GO+ de mémoire), utiliser le backend.

Voici quelque chose à essayer: mettre en place une URL que, lors de l'accès, les feux de l'exécution d'une tâche de la file d'attente des tâches. Il retourne une page web qui interroge toutes les ~5s vers une autre URL qui répond par vrai/faux si la tâche de la file d'attente des tâches a encore été terminé. La tâche de la file d'attente qui traite les données, ce qui peut prendre quelques 10s de secondes, et enregistre le résultat dans la base de données, soit en tant que données calculées ou une page web. Une fois la première page détecte qu'il a terminé, l'utilisateur est redirigé vers la page, qui va chercher le maintenant des résultats calculés à partir de la banque de données.

2voto

Martin Berends Points 2128

La nouvelle fonctionnalité expérimentale de traitement des données (une API AppEngine pour MapReduce) semble très bien adaptée à la résolution de ce problème. Il effectue un sharding automatique pour exécuter plusieurs processus de travail en parallèle.

0voto

dragonx Points 12362

De grandes opérations de données sur App Engine mis en œuvre en utilisant une sorte de mapreduce opération.

Voici une vidéo décrivant le processus, mais y compris les BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

Il ne sonne pas comme vous avez besoin de BigQuery, mais vous voudrez probablement utiliser la Carte et de Réduire les portions de la canalisation.

La principale différence entre ce que vous faites et le mapreduce situation, c'est que vous êtes le lancement d'une instance et d'une itération à travers les requêtes, où sur mapreduce, vous permettrait d'avoir une instance en cours d'exécution en parallèle pour chaque requête. Vous aurez besoin de réduire l'opération de "résumer" toutes les données, et d'écrire le résultat quelque part.

L'autre problème que vous avez est que vous devez utiliser les curseurs pour effectuer une itération. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

Si l'itérateur est à l'aide d'une requête de décalage, ça va être inefficace, car un décalage questions de la même requête, ignore un certain nombre de résultats, et vous donne la prochaine série, tandis que le curseur saute directement à la prochaine série.

0voto

stevep Points 574

Pas utile pour cela, mais quelque chose à méditer: je suppose que vous êtes l'obtention d'un grand nombre d'échantillons par session. Êtes-vous l'écriture d'un nouvel échantillon rec pour chaque RPC à partir de la session de client? Si donc, vous avez probablement eu quelques bonnes raisons pour cela. Pour les mêmes raisons, j'ai constaté que de recevoir et de transmettre l'échantillon cer grâce à une traction de la file d'attente peut bien fonctionner. La tâche dans la liste de la file d'attente loue un grand nombre d'échantillons, sérialise dans un TextProperty tous les échantillons ayant une session commune, et enregistre les données sérialisées dans le cadre d'une session d'enregistrement. Si vous avez un taux élevé d'échantillons par session, vous pouvez lop un zéro ou deux hors de votre 10K enregistrements.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X