When you read an execution plan you’re probably trying to identify the steps that Oracle went through to acquire the final result set so that you can decide whether or not there is a more efficient way of getting the same result.
For a serial execution plan this typically means you have to identify the join order, join methods and access methods together with the point at which each predicate was applied. Generally this isn’t too difficult, although subquery pushing (and a couple of presentation bugs relating to other uses of scalar subqueries) can cause a little confusion; and the difference between join order and the order of operation can be slightly obscured when considering hash joins.
Parallel execution plans are harder, though, because you really need to understand the impact of the order of operation, distribution mechanisms chosen and (in recent versions of Oracle) the timing of the generation and use of Bloom filters. The topic is stunningly large, and very easy to describe badly; it’s also to easy to make generalisations that turn out to be untrue or (at least) sufficiently incomplete as to be misleading. Rather than attempting to cover the topic in one note, I’ll be writing a short series.
I’ll be basing my comments on the two execution plans I produced for a recent question on Bloom filters, so you may want to read that article and its subsequent comments first (the link will open in a new window).
Preamble
The ideal target for parallel execution is linear scalability. In other words, if you execute a statement with parallel degree N then the workload should be shared N ways with no interference or duplication and the job should complete in one Nth of the time. There are immediately four caveats as far as Oracle implementation is concerned:
- there’s a setup overhead for parallel execution that eats into the time-saving (so we shouldn’t bother using parallel execution on queries that are only going to take a couple of seconds to run serially).
- in the case of a parallel query all the data has to pass to the front-end through the query co-ordinator (QC), so there’s an implicit bottleneck that can conceal a huge fraction of the time-saving
- a statement running at degree N is typically using 2*N parallel execution slaves, so you might wonder whether you deserve a “2*N” improvement in response time, rather than just N
- if you’re running on the Exadata database machine, a very large I/O requirement can be split across all the cell (storage) servers concurrently, so even a serial query on a full rack can, in effect, get pretty close to executing parallel degree 14 (provided you don’t want to do very much with the data once you’ve ripped it from the storage servers).
Having mentioned the caveats, I’m going to ignore them and stick to a simplified image of two sets of parallel query slaves engaged in some fairly simple query processing and ask how Oracle shares the work while avoiding contention and duplicated effort.
Key to doing “1/Nth” of the work is the attempt to get a parallel execution (PX) slave to do 1/Nth of the necessary disc I/O – so for a tablescan the query co-ordinator cuts the table into N pieces (technically it actually tends to use 13 * N pieces so that it can share N ways while allowing for an imbalance in content – I suspect the 13 comes from a historic 9:3:1 strategy). For access to partitioned tables the query co-ordinator may share out the partitions. In either case the co-ordinator can arrange to share the disk I/O fairly without collision or duplication of effort. That’s a basic first step.
But if we have to join table X and table Y, how do we ensure that 1/Nth of table Y that a slave reads is the bit associated with the 1/Nth of table X that it has read so far ? The answer is that we can’t, and that’s why distribution mechanisms are important and why (for simplicity, probably) Oracle generally ends up using 2 sets of N parallel execution slaves when running with degree N. One set of parallel execution slaves reads the first table as efficiently as possible and then (typically) distributes the data to the other set of parallel execution slaves using a mechanism that tries to anticipate the most effective strategy for reading the second set of data.
So, for example, if the optimizer estimates that Table X is 100MB but we’re only going to find 100 relevant rows in it while table Y will return 100,000 rows in 10GB of data the plan is likely to be:
- Slave set 1 – shared and scan table X and copy every relevant row you find to every slave in slave set 2 (broadcast)
- Slave set 2 – build an in-memory hash table with the rowsource from slave set 1, then share and scan table Y and probe your hash table
We generate the right answer with this “(broadcast, none)” distribution because every relevant row from table X is known to every slave in the second slave set, so there is no risk of this distribution method “losing” a join between a row in table X and table Y that should have happened. The plan (probably) shows good scalability – i.e. time to complete is inversely proportional to degree N – because our two tables are both split into N separate areas with N slaves scanning them concurrently. The performance threat comes from the cost of duplicating the 100 rows to each of the slaves in the second set – which is probably a small overhead in CPU and memory, with a little contention due to the concurrent “inter-process chatter” between the levels of slaves (every slave in set 1 spends a little time talking to – or wanting to talk to – every slave in set 2).
Alternatively, if the optimizer estimates that Table X will return 100,000 rows and table Y will return 1,000,000, then it may decide that there is too much data coming from table X to follow the broadcast strategy – which would end up with every slave in the second set receiving, and building a hash table from 100,000 rows. In this case we may see:
- Slave set 1 – scan table X, apply a “randomising” hash function (such as mod(join_column, N)) to the join columns and use the result to share the data evenly (we hope) to the second set of slaves (hash distribution)
- Slave set 2 – build a hash table in memory from the data it receives from slave set 1
- Slave set 1 – scan table Y, apply the same hash function to the join columns and again use the result to share the data out to the second set of slaves
- Slave set 2 – receive the incoming data, use it to probe the local in-memory hash table
In this case we know we will get the right answer because if two rows are supposed to join then the nature of the “(hash, hash)” distribution means that both rows will have been sent to the same parallel execution slave in slave set 2. Note that in this case Slave set 1 has scanned both the tables – but we still only scan the tables once each, and each scan is shared evenly between N non-interfering slaves. The performance trade-off is about memory, CPU and (particularly) inter-process contention.
- On the plus side: we have minimised the memory usage by sharing the data that goes into the hash table across N slaves rather than having every slave build the whole hash table; we have minimised the (first pass of) messaging by sending each row from each slave in the first set to just one slave in the second set, rather than sending every row to every slave.
- On the minus side: the first set of slaves has had to pass a large number of rows from table Y to the second set of slaves – which increases the CPU usage and increases the scope for contention in the inter-process chatter between the two sets of slaves. This is where the Bloom filter can come into play. If, for example, the optimizer thinks table X will return 100,000 rows and table Y will (based on the non-join predicates) return 1M rows, but the join will eliminate 90% of those rows, it may direct the second set of slaves to create a (small) Bloom filter from the values each has received and pass this back to the first set of slaves which can then use the filter to eliminate a lot of the rows from table Y that would otherwise be passed to the second set of slaves. We do not expect Bloom filters to be perfect – they will eliminate only rows that will not be needed, but they may not manage to eliminate all of those rows – the data that reaches the second set of slaves may still contain some rows that will disappear on the join but it will certainly contain all the rows that should have survived.
That completes my brief outline of the absolute basics of the interaction of parallel execution slaves and what they are trying to achieve. In the next article I’ll talk about “table queues” and the order of operation. (Comment 7 below lists the rest of the installments in this series.)
Jonathan, are we reading part of you next book’s chapter? Looking forward to the new part of mini-series. There is no question you will provide some very interesting and usefull details again
Comment by Pavol BabelPavol — October 13, 2013 @ 10:40 pm BST Oct 13,2013 |
Thanks Jonathan, I was looking for an explanation like this since a long time.
Comment by Todor Botev — October 15, 2013 @ 8:46 am BST Oct 15,2013 |
[…] close to me and will couple it to the work (blog articles and webinars) done by Randolph Geist and recent articles written by Jonathan Lewis to definitely master the parallel query in both single instance and […]
Pingback by You want a bridge from single instance to RAC? this book is for you | Mohamed Houri’s Oracle Notes — October 16, 2013 @ 2:21 pm BST Oct 16,2013 |
Jonathan,
Could you please provide more details regarding “on the Exadata database machine, a very large I/O requirement can be split across all the storage servers concurrently”. As far I know recomended AU size is 4M for Exadata. Does ever oracle try to read more data for one system call? Moreover, ASM read always primary data mirror. How it could be able to split one large IO request between two or more cells? Thanks in advance
Comment by Pavol Babel — October 17, 2013 @ 9:51 pm BST Oct 17,2013 |
Pavol,
This is just making the point that a single session is able to dispatch asynchronous calls to every storage cell simultaneously.
Comment by Jonathan Lewis — October 17, 2013 @ 10:02 pm BST Oct 17,2013 |
Hi Jonathan,
of course, single session is able to take advance from async IO even for single session sequential reads in 11g. However oracle is able to perform outstanding IO on any platform which supports asynchronous IO and it can us up to (at least) 32 slots for tracking it. Well, it is nothing new for you (I think you were the first ho pointed out to statistic “total number of slots”), but could be interesting for someone else.
There is only one think which was probably not stated correct “in effect, get pretty close to executing parallel degree 14”. I do not have FULL RACK to test (“only” 1/4 RACKS ), however one session is able to effectively dispatch more than one outstanding IO to one cell, which means parallel degree could be even higher than 14 in case of FULL RACK.
Moreover, it it is not important whether you have Exadata and storage cells at all. It is still quite common to have 300 or 600 disks in only one enterprise array (i.e. IBM DS 8k, EMC VMAX, HP XP, 3PAR, ..) and oracle is able to dispatch 32 outstanding IOs “simultaneously”.
Regards
Pavol Babel
Comment by Pavol Babel — January 27, 2014 @ 10:36 pm GMT Jan 27,2014 |
I have checked on Exadata 1/8 Rack, oracle is able to dispatch at least 6 oustanding IO requests simultaneously (however classic adaptive IO algorithm is not traceable as on conventional HW – total number of slot is always Zero on Exadata ).
Comment by Pavol Babel — January 27, 2014 @ 10:42 pm GMT Jan 27,2014 |
Could you kindly explain this “historic 9:3:1 strategy”? I wonder if it has anything to do with what I’m thinking of… It might explain the persistent out-of-balance distribution of workload to parallel slaves that I often see when two slave sets are used. (It used to be worse in 9i, then improved somewhat in 10g.)
Comment by Jason Bucata — October 20, 2013 @ 3:00 am BST Oct 20,2013 |
Jason,
When I said “historic” I meant “really old” – I think this strategy disappeared with Oracle 7 – though I could be wrong. To run a parallel query Oracle would split an object in N sections based on the degree of parallelism, then split each section into 3 pieces in a 9:3:1 ratio – and each slave would work from its largest to its smallest piece. The idea was that if a slave finished processing all its pieces it could take on some of the smaller pieces from other slaves, theoretically improving the balance of completion time for the entire slave set. This soon disappeared and Oracle changed the strategy to break a segment into 13N pieces of equal size – and I always assumed that they were simply re-suing the code that derived the “Unit size” of the original 9:3:1 split.
Your problem is more likely to be related either to bad luck (all the the interesting data happens to reside in a small part of a critical table) or to defects in the Ranger algorithm that Oracle uses for Order by or (sort) Group by operations. The Query Coordinator will sample data from a slave set to decide how to range-distribute it to the next set of slaves, but the sample is small, and if the data is already fairly well ordered the ranging decision can be very bad.
Christo Kutrovsky spent some time talking about this in his OpenWorld presentation this year. The PowerPoint slides are currently available at this URL.
Comment by Jonathan Lewis — October 20, 2013 @ 11:36 am BST Oct 20,2013 |
[…] I’d put up a reference post describing the set of tables I used to generate the plan from the previous post, and the query (with serial execution plan) that produced it. The setup is a simple star schema […]
Pingback by Parallel Execution – 2 | Oracle Scratchpad — March 5, 2015 @ 7:50 am GMT Mar 5,2015 |
[…] Parallel Execution Plans (Oct 2013): The first installment of a series on reading parallel plans […]
Pingback by Parallel Execution Catalogue | Oracle Scratchpad — November 17, 2022 @ 11:27 am GMT Nov 17,2022 |
[…] Parallel Execution Plans (Oct 2013): The first installment of a series on reading parallel plans […]
Pingback by Execution Plans Catalogue | Oracle Scratchpad — November 17, 2022 @ 11:31 am GMT Nov 17,2022 |